7-13 475 views
一、场景案例
需求:当开发上传某一个zip包时,自动解压到当前目录。已经解压过得zip包不再解压!
此处用到的模块有Python3自有模块os,sys,time,json,re,zipfile,logging, hashlib,traceback及自己写的send_weixin(微信发送模块)和logtest(日志文件模块)
二、写脚本前思路的分析
判断自己的pid是否存在,防止程序重复执行(基本上所有重复执行的脚本都需要这个判断哦)
查找上传目录下所有*.zip文件
比对zip文件,按文件名对比或者比对文件md5值(比对md5值会更加准确,防止同名的不同zip包不会正常解压)
没解压的解压,解压过的跳过。
解压后记录解压的文件信息,防止下次继续解压
三、贴上自己的send_weixin及logtest模块
#send_weixin模块 #!/usr/bin/env python3 # encoding: utf-8 """ @version: Python 3.8.2 @author: ycy @file: send_weixin.py @time: 2020/6/20 14:43 """ import json,traceback,requests,socket def send_weixin(func_argv_content): url='https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=wwa28310f4ff14cd76&corpsecret=HZ2imHCjTLnNjWAD79U8K7vTHp6TmXbNiDUT5sxdSGU' r1 = requests.get(url) a=json.loads(r1.content) print(a) r2 = requests.post(url='https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}'.format(a['access_token']), data=json.dumps({ "toparty": "2", "msgtype": "text", "agentid": 1000002, "text": { "content": "{}".format(json.dumps(func_argv_content)) }, "safe": "0" })) return r2 def get_host_ip(): ''' 获取服务器的ip :return: ''' try: host_info={} s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) host_info['host_ip'] = s.getsockname()[0] host_info['host_name']= socket.gethostname() except Exception as e: print(traceback.format_exc()) finally: s.close() return host_info if __name__ == '__main__': host_info=get_host_ip() mes = {} mes['data'] = 'test' mes['error'] = 'test' mes['host_info'] = host_info send_weixin(mes) #logtest模块 #!/usr/bin/env python3 # encoding: utf-8 """ @version: Python 3.8.2 @author: ycy @file: logtest.py @time: 2020/6/20 16:16 """ import logging class Loggerfile(object): def __init__(self,file_path,level,topic=None): file_handler = logging.FileHandler(file_path,'a',encoding='utf-8') fmt = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s") file_handler.setFormatter(fmt) self.logger = logging.Logger(topic,level=level) self.logger.addHandler(file_handler) def debug(self,msg): self.logger.debug(msg) def info(self,msg): self.logger.info(msg) def warning(self,msg): self.logger.warning(msg) def error(self,msg): self.logger.error(msg) class Loggerconsole(object): def __init__(self,level,topic=None): console_handler = logging.StreamHandler() fmt = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s") console_handler.setFormatter(fmt) self.logger = logging.Logger(topic,level=level) self.logger.addHandler(console_handler) def debug(self,msg): self.logger.debug(msg) def info(self,msg): self.logger.info(msg) def warning(self,msg): self.logger.warning(msg) def error(self,msg): self.logger.error(msg) if __name__ == '__main__': logger = Loggerfile('test.log',logging.DEBUG,'日志主题') logger.info('我是日志信息info') # logger.error('我是日志信息Error') # logger.debug('我是日志信息Debug') loggerconsole = Loggerconsole(logging.INFO,'终端日志输出主题') loggerconsole.info('我是终端输出日志info')
三、主体解压脚本
#!/usr/bin/env python3 # encoding: utf-8 """ @version: Python 3.8.2 @author: ycy @file: auto_unzip.py.py @time: 2020/6/20 11:13 """ #脚本说明 ''' 自动解压/data/cdn/pub及/data/cdn/beat目录下的zip包 要求:已经解压的不能再次解压 zip包未上传完成不能解压 比对已解压的文件及未解压的文件 ''' import os,sys,time,json,re,zipfile,traceback,hashlib import send_weixin import logtest,logging #记录已经解压过得文件json is_unzip_file='/tmp/pycharm_project_108/Invm114/is_unzip' #需要解压的具体源目录,写绝对路径 source_file_list=['/data/cdn/beat','/data/cdn/pub'] def write_pid(func_argv_pid_file_name): fp = open(func_argv_pid_file_name,'w') fp.write(str(os.getpid())) fp.close() def read_pid(func_argv_pid_file_name): #若pid文件存在,可能程序已经运行,若运行则退出脚本,也可能程序异常退出,导致pid文件未正常清理,若未清理则正常执行脚本 if os.path.isfile(func_argv_pid_file_name): fp = open(func_argv_pid_file_name,'r') pid = fp.read() ret = os.system("ps -p {} | grep -v TTY".format(pid)) if int(ret) == 0: return pid else: return False else: return False def find_source_zip_file(func_argv_loggerfile,func_argv_source_path,func_argv_find_zip_file_dict={}): for path, path_list, file_list in os.walk(func_argv_source_path): #path 是目录 ,path_list是子目录,file_list是文件名 # print(path,path_list,file_list) for file in file_list: #过滤以.zip结尾的文件 if re.findall(r'.+\.zip$',file): #获取zip文件的绝对路径,包含文件名 source_file_abs = os.path.join(path,file) func_argv_find_zip_file_dict[source_file_abs] = [] func_argv_find_zip_file_dict[source_file_abs].append(path) func_argv_find_zip_file_dict[source_file_abs].append(file) # print(func_argv_find_zip_file_dict) func_argv_loggerfile.debug('this is source path({}),this is source file--->;({})'.format(func_argv_source_path, func_argv_find_zip_file_dict)) return func_argv_find_zip_file_dict def unzip_file(func_argv_loggerfile,func_file_list): # print(func_file_list) #获取zip文件的绝对路径 abs_file=os.path.join(func_file_list[0],func_file_list[1]) print(abs_file) #判断zip文件是否存在 if zipfile.is_zipfile(abs_file): with zipfile.ZipFile(abs_file) as zf: #解压到当前目录 zf.extractall(path=func_file_list[0]) print('解压成功--->;[{}]'.format(abs_file)) msg['info'] = 'this file is unzipped successfully--->;({})'.format(abs_file) func_argv_loggerfile.info('this file is unzipped successfully--->;({})'.format(abs_file)) send_weixin.send_weixin(msg) else: print('这不是个有效的zip文件--->;[{}]'.format(abs_file)) func_argv_loggerfile.warning('this is not a valid zip file--->;({})'.format(abs_file)) return func_file_list def running(func_argv_loggerfile,is_unzip_file): #先判断is_unzip_file文件是否存在,存在则读取,不存在则定义为空字典 if os.path.isfile(is_unzip_file): with open (is_unzip_file,'r') as f: is_unzip_file_dict = json.load(f) else: is_unzip_file_dict = {} #查找需要解压的文件,定义一个字典去存储所要解压的文件信息 find_zip_file_dict = {} for source_path in source_file_list: find_zip_file_dict = find_source_zip_file(func_argv_loggerfile,source_path,find_zip_file_dict) #开始解压 for find_file in find_zip_file_dict: # print(find_file) #判断文件是否在已经解压的记录里面,在的话,则不解压 if find_file not in is_unzip_file_dict: unzip_file(func_argv_loggerfile,find_zip_file_dict[find_file]) else: func_argv_loggerfile.debug('This file is already in the record file and will not be decompressed--->;({})'.format(find_file)) print('此{}文件已解压过了,不进行解压'.format(find_file)) #记录下解压的文件信息 with open(is_unzip_file, 'w') as f: json.dump(find_zip_file_dict,f) if __name__ == '__main__': try: now_time=time.strftime("%Y%m%d",time.localtime(time.time())) # print(now_time) host_info = send_weixin.get_host_ip() # print(host_info) msg={} msg['script_name']=sys.argv[0] msg['host_info']=host_info # print(msg) #设置下日志地址 loggerfile = logtest.Loggerfile('/tmp/pycharm_project_108/Invm114/logs/{}_unzip.log'.format(now_time),logging.DEBUG,'unzip_client_package') pid_file_name = '.'.join([sys.argv[0],'pid']) # print(pid_file_name) if read_pid(pid_file_name): loggerfile.warning('the script is running,please retry again...') print('pid文件存在,请稍后再试') exit(1) else: write_pid(pid_file_name) running(loggerfile,is_unzip_file) # 移除pid文件 os.remove(pid_file_name) except: #移除pid文件 os.remove(pid_file_name) #捕获执行的报错 msg['error'] = traceback.format_exc() #微信发送报错 send_weixin.send_weixin(msg) #日志文件记录报错 loggerfile.error(traceback.format_exc()) #终端输出报错 print(traceback.format_exc()) exit(1)
有个问题存在,比如开发上传了一个同名的zip包,上面这个根据名字来解压就会导致,同名的zip包不会再次解压了。我觉得还需要比对下文件的md5值会更加准确!
在上面的基础上修改了下,这个就是按md5值来判断是否已经解压了的:
#!/usr/bin/env python3 # encoding: utf-8 """ @version: Python 3.8.2 @author: ycy @file: auto_unzip.py.py @time: 2020/6/20 11:13 """ #脚本说明 ''' 自动解压/data/cdn/pub及/data/cdn/beat目录下的zip包 要求:已经解压的不能再次解压 zip包未上传完成不能解压 比对已解压的文件及未解压的文件 ''' import os,sys,time,json,re,zipfile,traceback,hashlib import send_weixin import logtest,logging #记录已经解压过得文件json is_unzip_file='/tmp/pycharm_project_108/Invm114/is_unzip' #需要解压的具体源目录,写绝对路径 source_file_list=['/data/cdn/beat','/data/cdn/pub'] def write_pid(func_argv_pid_file_name): fp = open(func_argv_pid_file_name,'w') fp.write(str(os.getpid())) fp.close() def read_pid(func_argv_pid_file_name): #若pid文件存在,可能程序已经运行,若运行则退出脚本,也可能程序异常退出,导致pid文件未正常清理,若未清理则正常执行脚本 if os.path.isfile(func_argv_pid_file_name): fp = open(func_argv_pid_file_name,'r') pid = fp.read() ret = os.system("ps -p {} | grep -v TTY".format(pid)) if int(ret) == 0: return pid else: return False else: return False def caclMD5(filepath): with open(filepath,'rb') as f: md5obj = hashlib.md5() md5obj.update(f.read()) hash = md5obj.hexdigest() # print(hash) return hash def find_source_zip_file(func_argv_loggerfile,func_argv_source_path,func_argv_find_zip_file_dict={}): for path, path_list, file_list in os.walk(func_argv_source_path): #path 是目录 ,path_list是子目录,file_list是文件名 # print(path,path_list,file_list) for file in file_list: #过滤以.zip结尾的文件 if re.findall(r'.+\.zip$',file): #获取zip文件的绝对路径,包含文件名 source_file_abs = os.path.join(path,file) #获取文件的md5值 file_md5=caclMD5(source_file_abs) func_argv_find_zip_file_dict[source_file_abs] = [] func_argv_find_zip_file_dict[source_file_abs].append(path) func_argv_find_zip_file_dict[source_file_abs].append(file) func_argv_find_zip_file_dict[source_file_abs].append(file_md5) # print(func_argv_find_zip_file_dict) func_argv_loggerfile.debug('this is source path({}),this is source file--->;({})'.format(func_argv_source_path, func_argv_find_zip_file_dict)) return func_argv_find_zip_file_dict def unzip_file(func_argv_loggerfile,func_file_list): # print(func_file_list) #获取zip文件的绝对路径 abs_file=os.path.join(func_file_list[0],func_file_list[1]) print(abs_file) #判断zip文件是否存在 if zipfile.is_zipfile(abs_file): with zipfile.ZipFile(abs_file) as zf: #解压到当前目录 zf.extractall(path=func_file_list[0]) print('解压成功--->;[{}]'.format(abs_file)) msg['info'] = 'this file is unzipped successfully--->;({})'.format(abs_file) func_argv_loggerfile.info('this file is unzipped successfully--->;({})'.format(abs_file)) send_weixin.send_weixin(msg) else: print('这不是个有效的zip文件--->;[{}]'.format(abs_file)) func_argv_loggerfile.warning('this is not a valid zip file--->;({})'.format(abs_file)) return func_file_list def running(func_argv_loggerfile,is_unzip_file): #先判断is_unzip_file文件是否存在,存在则读取,不存在则定义为空字典 if os.path.isfile(is_unzip_file): with open (is_unzip_file,'r') as f: is_unzip_file_dict = json.load(f) else: is_unzip_file_dict = {} #查找需要解压的文件,定义一个字典去存储所要解压的文件信息 find_zip_file_dict = {} for source_path in source_file_list: find_zip_file_dict = find_source_zip_file(func_argv_loggerfile,source_path,find_zip_file_dict) #开始解压 for find_file in find_zip_file_dict: # print(find_file) #判断文件是否在已经解压的记录里面,在的话,则不解压,通过md5值比对 file_md5=find_zip_file_dict[find_file][2] #只有is_unzip_file文件中出现了这个md5值,我就认为该文件已经被解压过了,否则就是没被解压过 if not re.findall(r'\'{}\''.format(file_md5),str(is_unzip_file_dict)): # if find_file not in is_unzip_file_dict: unzip_file(func_argv_loggerfile,find_zip_file_dict[find_file]) else: func_argv_loggerfile.debug('This file is already in the record file and will not be decompressed--->;({})'.format(find_file)) print('此{}文件已解压过了,不进行解压'.format(find_file)) #记录下解压的文件信息 with open(is_unzip_file, 'w') as f: json.dump(find_zip_file_dict,f) if __name__ == '__main__': try: now_time=time.strftime("%Y%m%d",time.localtime(time.time())) # print(now_time) host_info = send_weixin.get_host_ip() # print(host_info) msg={} msg['script_name']=sys.argv[0] msg['host_info']=host_info # print(msg) #设置下日志地址 loggerfile = logtest.Loggerfile('/tmp/pycharm_project_108/Invm114/logs/{}_unzip.log'.format(now_time),logging.DEBUG,'unzip_client_package') pid_file_name = '.'.join([sys.argv[0],'pid']) # print(pid_file_name) if read_pid(pid_file_name): loggerfile.warning('the script is running,please retry again...') print('pid文件存在,请稍后再试') exit(1) else: write_pid(pid_file_name) running(loggerfile,is_unzip_file) # 移除pid文件 os.remove(pid_file_name) except: #移除pid文件 os.remove(pid_file_name) #捕获执行的报错 msg['error'] = traceback.format_exc() #微信发送报错 send_weixin.send_weixin(msg) #日志文件记录报错 loggerfile.error(traceback.format_exc()) #终端输出报错 print(traceback.format_exc()) exit(1)
感谢您的阅读,zipfile+logging+hashlib+json等模块的小小案例应用就介绍到这里了。我们下期再见~
版权属于: 抓不住的疯
转载时必须以链接形式注明原始出处及本声明。