diff --git a/src/core.py b/src/core.py index f84a9c7..e29f3ad 100644 --- a/src/core.py +++ b/src/core.py @@ -26,19 +26,11 @@ # #################################################################################### -from copy import copy, deepcopy +from time import sleep, perf_counter,time,strftime,localtime -from time import sleep, perf_counter -#from threading import Thread from multiprocessing import Process, Manager, Queue from queue import Empty -import multiprocessing - -from zstandard import ZstdCompressor,ZstdDecompressor - -from zipfile import ZipFile - from os import scandir,stat,sep from os import remove as os_remove from os import cpu_count @@ -46,6 +38,8 @@ from os.path import abspath,normpath,basename from os.path import join as path_join +from zipfile import ZipFile + from platform import system as platform_system from platform import release as platform_release from platform import node as platform_node @@ -59,15 +53,16 @@ from collections import defaultdict -from time import time,strftime,localtime - from pickle import dumps,loads from difflib import SequenceMatcher +from pathlib import Path as pathlib_Path + +from zstandard import ZstdCompressor,ZstdDecompressor + from executor import Executor -from pathlib import Path as pathlib_Path def bytes_to_str(num): if num < 1024: @@ -95,9 +90,9 @@ def str_to_bytes(string): units = {'kb': 1024,'mb': 1024*1024,'gb': 1024*1024*1024,'tb': 1024*1024*1024*1024, 'b':1} try: string = string.replace(' ','') - for suffix in units: + for suffix,weight in units.items(): if string.lower().endswith(suffix): - return int(string[0:-len(suffix)]) * units[suffix] + return int(string[0:-len(suffix)]) * weight return int(string) except: @@ -117,7 +112,7 @@ def byte_to_bools(byte, num_bools=8): bool_list = [False]*num_bools for i in range(num_bools): - if (byte & (1 << i)): + if byte & (1 << i): bool_list[num_bools-1-i]=True return tuple(bool_list) @@ -128,17 +123,19 @@ def test_regexp(expr): search(expr,teststring) except Exception as e: return e - else: - return None -entry_LUT_encode={} -entry_LUT_decode={} + return None -for i in range(256): - temp_tuple = entry_LUT_decode[i]=byte_to_bools(i) - entry_LUT_encode[temp_tuple]=i - #print(i, temp_tuple) +LUT_encode={} +LUT_decode={} +def prepare_LUTs(): + #global LUT_decode,LUT_encode + for i in range(256): + temp_tuple = LUT_decode[i]=byte_to_bools(i) + LUT_encode[temp_tuple]=i + +prepare_LUTs() ####################################################################### data_format_version='1.0010' @@ -181,7 +178,6 @@ def __init__(self,label,scan_path,log): self.info_line_current = '' self.abort_action = False - #self.files_search_progress = 0 #self.crc_progress_info=0 @@ -313,7 +309,7 @@ def scan(self,cde_list,check_dev=True): ######################### filenames_set=set() - self.scan_rec(self.header.scan_path,self.scan_data,filenames_set) + self.scan_rec(self.header.scan_path,self.scan_data,filenames_set,check_dev=check_dev) self.filenames = tuple(sorted(list(filenames_set))) ######################### @@ -329,7 +325,7 @@ def scan(self,cde_list,check_dev=True): if cde_list: self.log.info('estimating CD pool') - self.info_line = f'estimating files pool for custom data extraction' + self.info_line = 'estimating files pool for custom data extraction' self.prepare_customdata_pool_rec(self.scan_data,[]) self.info_line = '' @@ -345,12 +341,12 @@ def prepare_customdata_pool_rec(self,scan_like_data,parent_path): self_customdata_pool = self.customdata_pool for entry_name,items_list in scan_like_data.items(): + size,is_dir,is_file,is_symlink,is_bind,has_files,mtime = items_list[0:7] + if self.abort_action: break try: - #is_dir,is_file,is_symlink,is_bind,has_files,size,mtime = items_list - size,is_dir,is_file,is_symlink,is_bind,has_files,mtime = items_list[0:7] - subpath_list = parent_path.copy() + [entry_name] + subpath_list = parent_path + [entry_name] if not is_symlink and not is_bind: if is_dir: @@ -411,7 +407,7 @@ def extract_customdata(self): self_header = self.header scan_path = self_header.scan_path - self.info_line = f'custom data extraction ...' + self.info_line = 'custom data extraction ...' self_header.files_cde_quant = 0 self_header.files_cde_size = 0 @@ -421,7 +417,6 @@ def extract_customdata(self): cde_list = self.header.cde_list - customdata_helper={} cd_index=0 @@ -437,8 +432,6 @@ def extract_customdata(self): full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep) - #cde_run_list = list(executable) + [full_file_path] - io_list.append( [ executable,parameters,full_file_path,timeout,shell,do_crc,size,scan_like_list,rule_nr ] ) ############################################################# @@ -472,53 +465,6 @@ def extract_customdata(self): ############################################################# - if False: - for (scan_like_list,subpath,rule_nr) in self.customdata_pool.values(): - if self.abort_action: - break - - expressions,use_smin,smin_int,use_smax,smax_int,executable,parameters,shell,timeout,crc = cde_list[rule_nr] - - full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep) - - size = scan_like_list[0] - - cde_run_list = list(executable) + list(parameters) + [full_file_path] - - if crc: - self.info_line_current = f'{subpath} CRC calculation ({bytes_to_str(size)})' - crc_val = self_calc_crc(full_file_path,size) - - self.info_line_current = f'{subpath} ({bytes_to_str(size)})' - - cd_ok,output = exe_run(cde_run_list,shell,timeout) - - if not cd_ok: - self_header.files_cde_errors_quant +=1 - - new_elem={} - new_elem['cd_ok']=cd_ok - - if output not in customdata_helper: - customdata_helper[output]=cd_index - new_elem['cd_index']=cd_index - cd_index+=1 - - self_customdata_append(output) - else: - new_elem['cd_index']=customdata_helper[output] - - if crc: - new_elem['crc_val']=crc_val - - scan_like_list.append(new_elem) - - self_header.files_cde_quant += 1 - self_header.files_cde_size += size - self_header.files_cde_size_extracted += getsizeof(output) - - self.info_line_current = '' - del self.customdata_pool del customdata_helper @@ -526,7 +472,7 @@ def extract_customdata(self): ############################################################# def tupelize_rec(self,scan_like_data): - entry_LUT_encode_loc = entry_LUT_encode + LUT_encode_loc = LUT_encode self_tupelize_rec = self.tupelize_rec @@ -568,7 +514,7 @@ def tupelize_rec(self,scan_like_data): else: has_crc = False - code_new = entry_LUT_encode_loc[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] + code_new = LUT_encode_loc[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] sub_list_elem=[entry_name_index,code_new,size,mtime] @@ -600,14 +546,14 @@ def pack_data(self): cd_ok = False has_crc = False - code = entry_LUT_encode[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] + code = LUT_encode[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] self.filestructure = ('',code,size,mtime,self.tupelize_rec(self.scan_data)) del self.filenames_helper del self.scan_data def clone_record_rec(self,cd_org,filenames_org,tuple_like_data,keep_cd,keep_crc): - entry_LUT_decode_loc = entry_LUT_decode + LUT_decode_loc = LUT_decode self_get_file_name = self.get_file_name self_clone_record_rec = self.clone_record_rec @@ -617,7 +563,7 @@ def clone_record_rec(self,cd_org,filenames_org,tuple_like_data,keep_cd,keep_crc) else: name='' - is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc = entry_LUT_decode_loc[code] + is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc = LUT_decode_loc[code] if not keep_cd or not keep_crc: has_cd = has_cd and keep_cd if not has_cd: @@ -625,7 +571,7 @@ def clone_record_rec(self,cd_org,filenames_org,tuple_like_data,keep_cd,keep_crc) has_crc = has_crc and keep_crc - code = entry_LUT_encode[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] + code = LUT_encode[ (is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc) ] new_list = [name_index,code,size,mtime] @@ -667,7 +613,6 @@ def clone_record(self,file_path,keep_cd=True,keep_crc=True,compression_level=16) ######################################################################################## def find_items(self, results_queue,abort_queue, - record_nr, size_min,size_max, filename_search_kind,name_func_to_call,cd_search_kind,cd_func_to_call): @@ -678,13 +623,13 @@ def find_items(self, filenames_loc = self.filenames filestructure = self.filestructure - files_search_progress = 0 - files_search_progress_update_quant = 0 + search_progress = 0 + search_progress_update_quant = 0 if cd_search_kind!='dont': self.decompress_customdata() - entry_LUT_decode_loc = entry_LUT_decode + LUT_decode_loc = LUT_decode use_size = bool(size_min or size_max) @@ -719,19 +664,18 @@ def check_abort(): if check_abort(): break - files_search_progress_update_quant+=1 - if files_search_progress_update_quant>4096: - results_queue.put((files_search_progress,None)) - - files_search_progress_update_quant=0 + search_progress_update_quant+=1 + if search_progress_update_quant>4096: + results_queue.put((search_progress,None)) #just update progress bar + search_progress_update_quant=0 - files_search_progress +=1 + search_progress +=1 name_nr,code,size,mtime = data_entry[0:4] name = filenames_loc[name_nr] - is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc = entry_LUT_decode_loc[code] + is_dir,is_file,is_symlink,is_bind,has_cd,has_files,cd_ok,has_crc = LUT_decode_loc[code] elem_index=4 if has_files: @@ -753,7 +697,7 @@ def check_abort(): #katalog moze spelniac kryteria naazwy pliku ale nie ma rozmiaru i custom data if name_func_to_call: if name_func_to_call(name): - results_queue.put((files_search_progress,tuple([tuple(next_level),size,mtime]))) + results_queue.put((search_progress,tuple([tuple(next_level),size,mtime]))) if sub_data: search_list_append( (sub_data,next_level) ) @@ -811,9 +755,9 @@ def check_abort(): else: continue - results_queue.put((files_search_progress,tuple([tuple(next_level),size,mtime ]))) + results_queue.put((search_progress,tuple([tuple(next_level),size,mtime ]))) - results_queue.put((files_search_progress,None)) + results_queue.put((search_progress,None)) def find_items_sort(self,what,reverse): if what=='data': @@ -823,10 +767,10 @@ def find_items_sort(self,what,reverse): elif what=='ctime': self.find_results.sort(key = lambda x : (x[0][0:-1],x[2]),reverse=reverse) else: - print('unknown sorting',what,mod) + print('error unknown sorting',what,reverse) def prepare_info(self): - bytes_to_str_mod = lambda x : bytes_to_str(x) if type(x) == int else x + bytes_to_str_mod = lambda x : bytes_to_str(x) if isinstance(x,int) else x info_list = [] @@ -834,7 +778,7 @@ def prepare_info(self): self.txtinfo_basic = 'init-basic' try: - self.FILE_SIZE = stat(self.file_path).st_size + self.FILE_SIZE = stat(self.file_path).st_size except Exception as e: print('prepare_info stat error:%s' % e ) else: @@ -847,22 +791,22 @@ def prepare_info(self): local_time = strftime('%Y/%m/%d %H:%M:%S',localtime(self.header.creation_time)) info_list.append(f'record label : {self_header.label}') - info_list.append(f'') + info_list.append('') info_list.append(f'scanned path : {self_header.scan_path}') info_list.append(f'scanned space : {bytes_to_str(self_header.sum_size)}') info_list.append(f'scanned files : {fnumber(self_header.quant_files)}') info_list.append(f'scanned folders : {fnumber(self_header.quant_folders)}') - info_list.append(f'') + info_list.append('') info_list.append(f'creation host : {self_header.creation_host} ({self_header.creation_os})') info_list.append(f'creation time : {local_time}') self.txtinfo_basic = '\n'.join(info_list) - info_list.append(f'') + info_list.append('') info_list.append(f'database file : {self.FILE_NAME} ({bytes_to_str(self.FILE_SIZE)})') - info_list.append(f'') - info_list.append(f'internal sizes : compressed decompressed') - info_list.append(f'') + info_list.append('') + info_list.append('internal sizes : compressed decompressed') + info_list.append('') info_list.append(f'header :{bytes_to_str_mod(zip_file_info["header"]).rjust(14)}{bytes_to_str_mod(self.zipinfo["header"]).rjust(14)}') info_list.append(f'filestructure :{bytes_to_str_mod(zip_file_info["filestructure"]).rjust(14)}{bytes_to_str_mod(self.zipinfo["filestructure"]).rjust(14)}') @@ -941,7 +885,6 @@ def load(self,file_path): self.prepare_info() - global data_format_version if self.header.data_format_version != data_format_version: self.log.error(f'incompatible data format version error: {self.header.data_format_version} vs {data_format_version}') return True @@ -949,13 +892,12 @@ def load(self,file_path): except Exception as e: print('loading error:%s' % e ) return True - else: - return False + + return False decompressed_filestructure = False def decompress_filestructure(self): if not self.decompressed_filestructure: - with ZipFile(self.file_path, "r") as zip_file: decompressor = ZstdDecompressor() @@ -972,13 +914,12 @@ def decompress_filestructure(self): self.prepare_info() return True - else: - return False + + return False decompressed_customdata = False def decompress_customdata(self): if not self.decompressed_customdata: - with ZipFile(self.file_path, "r") as zip_file: try: customdata_ser_comp = zip_file.read('customdata') @@ -993,11 +934,11 @@ def decompress_customdata(self): self.prepare_info() return True - else: - return False + + return False ####################################################################### -def find_items_for_subprocess(results_queue,abort_queue,record_file_path,record_nr,size_min,size_max,find_filename_search_kind,name_func_to_call,find_cd_search_kind,cd_func_to_call): +def find_items_for_subprocess(record_info,results_queue,abort_queue,record_file_path,size_min,size_max,find_filename_search_kind,name_func_to_call,find_cd_search_kind,cd_func_to_call): t1 = perf_counter() new_record = LibrerRecord('dummylabel','dummyscanpath','dummylog') @@ -1005,13 +946,13 @@ def find_items_for_subprocess(results_queue,abort_queue,record_file_path,record_ if new_record.load(record_file_path): print('find_items_for_subprocess error:',record_file_path) else: - new_record.find_items(results_queue,abort_queue,record_nr, + new_record.find_items(results_queue,abort_queue, size_min,size_max, find_filename_search_kind,name_func_to_call, find_cd_search_kind,cd_func_to_call) t2 = perf_counter() - print('timing record_nr:',record_nr,t2-t1) + print(f'timing {record_info}: {t2-t1}') ####################################################################### class LibrerCore: @@ -1195,11 +1136,11 @@ def find_items_in_records(self, records_to_process = [range_par] if range_par else list(self.records) records_to_process.sort(reverse = True,key = lambda x : x.header.quant_files) + #for record in records_to_process: # print(record.header.label,'\t',record.header.quant_files) - - self.files_search_progress = 0 + self.total_search_progress = 0 self.search_record_nr=0 #self.search_record_ref=None @@ -1209,7 +1150,7 @@ def find_items_in_records(self, self.abort_action = False for record in records_to_process: record.abort_action = False - ############################################################ + ############################################################ max_processes = cpu_count() @@ -1231,7 +1172,7 @@ def find_items_in_records(self, abort_queue = Queue() abort_queues.append(abort_queue) - subprocess = Process(target=find_items_for_subprocess, args=(results_queue,abort_queue,record.file_path,record_nr,size_min,size_max, + subprocess = Process(target=find_items_for_subprocess, args=(record.header.label,results_queue,abort_queue,record.file_path,size_min,size_max, find_filename_search_kind,name_func_to_call, find_cd_search_kind,cd_func_to_call)) @@ -1244,30 +1185,30 @@ def find_items_in_records(self, def suck_queue(q,l): got=0 - last_i=0 + last_processed=0 try: q_get = q.get l_append = l.append while val := q_get(False): #processed,found_data - (i,f)=val - last_i=i + (processed,f)=val + last_processed=processed got+=1 if f:#None tylko update i l_append(f) except Empty: pass - return last_i + return last_processed ############################################################ while True: if self.abort_action: _ = [abort_queues[record_nr].put(True) for record_nr in range(records_to_process_len)] - need_to_run = [ record_nr for record_nr in range(records_to_process_len) if jobs[record_nr][0]==False ] + need_to_run = [ record_nr for record_nr in range(records_to_process_len) if not jobs[record_nr][0] ] need_to_run_len = len(need_to_run) - running = len([record_nr for record_nr in range(records_to_process_len) if jobs[record_nr][0]==True and jobs[record_nr][1].is_alive() ]) + running = len([record_nr for record_nr in range(records_to_process_len) if jobs[record_nr][0] and jobs[record_nr][1].is_alive() ]) self.search_record_nr = records_to_process_len-running-need_to_run_len self.records_perc_info = self.search_record_nr * 100.0 / records_to_process_len @@ -1285,14 +1226,14 @@ def suck_queue(q,l): for record_nr in range(records_to_process_len): if jobs[record_nr][0]: - i = suck_queue(results_queues[record_nr],records_to_process[record_nr].find_results) - if i>total_progress[record_nr]: - total_progress[record_nr] = i + processed = suck_queue(results_queues[record_nr],records_to_process[record_nr].find_results) + if processed>total_progress[record_nr]: + total_progress[record_nr] = processed - self.files_search_progress = sum(total_progress) + self.total_search_progress = sum(total_progress) self.find_res_quant = sum([len(record.find_results) for record in records_to_process]) - if running==0 and need_to_run_len==0 and all([results_queues[record_nr].empty() for record_nr in range(records_to_process_len)]): + if running==0 and need_to_run_len==0 and all(results_queues[record_nr].empty() for record_nr in range(records_to_process_len)): break sleep(0.1) @@ -1318,4 +1259,3 @@ def delete_record_by_id(self,rid): self.log.error(e) self.update_sorted() - diff --git a/src/executor.py b/src/executor.py index f86c40d..debb98d 100644 --- a/src/executor.py +++ b/src/executor.py @@ -28,7 +28,6 @@ from subprocess import Popen, STDOUT, PIPE,TimeoutExpired from time import time -from psutil import Process from signal import SIGTERM from hashlib import sha1 from os import sep @@ -36,8 +35,10 @@ from os import name as os_name from sys import getsizeof +from psutil import Process + windows = bool(os_name=='nt') -param_indicator = '%' +PARAM_INDICATOR_SIGN = '%' def get_command_list(executable,parameters,full_file_path,shell=False): if windows: @@ -48,13 +49,13 @@ def get_command_list(executable,parameters,full_file_path,shell=False): full_file_path = f'"{full_file_path}"' if parameters: - if param_indicator not in parameters: + if PARAM_INDICATOR_SIGN not in parameters: return None - parameters = parameters.replace(f'"{param_indicator}"',param_indicator) + parameters = parameters.replace(f'"{PARAM_INDICATOR_SIGN}"',PARAM_INDICATOR_SIGN) else: - parameters=param_indicator + parameters=PARAM_INDICATOR_SIGN - parameters_list = [p_elem.replace(param_indicator,full_file_path) for p_elem in parameters.split() if p_elem] + parameters_list = [p_elem.replace(PARAM_INDICATOR_SIGN,full_file_path) for p_elem in parameters.split() if p_elem] single_command_list = [executable] + parameters_list @@ -75,7 +76,6 @@ def __init__(self,io_list,callback): self.files_cde_size = 0 self.files_cde_size_extracted = 0 - def calc_crc(self,fullpath,size): CRC_BUFFER_SIZE=4*1024*1024 buf = bytearray(CRC_BUFFER_SIZE) @@ -87,36 +87,37 @@ def calc_crc(self,fullpath,size): file_handle=open(fullpath,'rb') file_handle_readinto=file_handle.readinto except Exception as e: - self.log.error(e) + #self.log.error(e) + print(e) return None - else: - hasher = sha1() - hasher_update=hasher.update - #faster for smaller files - if size