Skip to content

Commit

Permalink
crc calculation, working timeout on CD extraction with tree killing, …
Browse files Browse the repository at this point in the history
…work in progress
  • Loading branch information
piotrj committed Nov 18, 2023
1 parent de50e41 commit 70cdb6a
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 96 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ appdirs==1.4.4
send2trash==1.8.2
zstandard==0.21.0
ordered-set==4.1.0
psutil=5.9.6
265 changes: 211 additions & 54 deletions src/core.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,36 @@
#!/usr/bin/python3

####################################################################################
#
# Copyright (c) 2023 Piotr Jochymek
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
####################################################################################

from os import scandir
from os import stat
from os import sep
from os import getpgid
from os import killpg

from os.path import join as path_join
from os.path import abspath
from os.path import normpath
Expand All @@ -13,6 +41,8 @@
from re import search
from sys import getsizeof

from hashlib import sha1

from collections import defaultdict

import re
Expand All @@ -29,7 +59,10 @@

import difflib

from subprocess import STDOUT, check_output, Popen, TimeoutExpired, PIPE
from executor import Executor

from subprocess import STDOUT, TimeoutExpired, PIPE, check_output
#, Popen

import pathlib

Expand Down Expand Up @@ -102,7 +135,7 @@ def test_regexp(expr):
#print(i, temp_tuple)

#######################################################################
data_format_version='1.0001'
data_format_version='1.0002'

class LibrerCoreData :
label = ""
Expand Down Expand Up @@ -151,12 +184,57 @@ def __init__(self,label,path,log):
self.abort_action = False
self.files_search_progress = 0

self.crc_progress_info=0

def file_name(self):
return f'{self.db.rid}.dat'

def abort(self):
self.abort_action = True

CRC_BUFFER_SIZE=4*1024*1024
def calc_crc(self,fullpath,size):
buf = bytearray(self.CRC_BUFFER_SIZE)
view = memoryview(buf)

self.crc_progress_info=0

try:
file_handle=open(fullpath,'rb')
file_handle_readinto=file_handle.readinto
except Exception as e:
self.log.error(e)
return None
else:
hasher = sha1()
hasher_update=hasher.update

#faster for smaller files
if size<CRC_BUFFER_SIZE:
hasher_update(view[:file_handle_readinto(buf)])
else:
while rsize := file_handle_readinto(buf):
hasher_update(view[:rsize])


if rsize==CRC_BUFFER_SIZE:
#still reading
self.crc_progress_info+=rsize

if self.abort_action:
break

self.crc_progress_info=0

file_handle.close()

if self.abort_action:
return None

#only complete result
#return hasher.hexdigest()
return hasher.digest()

def scan_rec(self, path, dictionary,check_dev=True,dev_call=None) :
if self.abort_action:
return True
Expand Down Expand Up @@ -294,7 +372,15 @@ def tupelize_rec(self,dictionary):

elif len_items_list==8:
cd = items_list[7]
cd_ok,is_compressed,output = cd

cd_len = len(cd)
if cd_len==3:
cd_ok,is_compressed,output = cd
elif cd_len==4:
cd_ok,is_compressed,output,crc = cd
else:
print('lewizna crc:',cd)
continue

has_cd = True

Expand Down Expand Up @@ -348,7 +434,7 @@ def prepare_custom_data_pool_rec(self,dictionary,parent_path):
matched = False

rule_nr=-1
for expressions,use_smin,smin_int,use_smax,smax_int,executable,time_int in self_db_cde_list:
for expressions,use_smin,smin_int,use_smax,smax_int,executable,timeout,crc in self_db_cde_list:
if self.abort_action:
break
if matched:
Expand Down Expand Up @@ -378,10 +464,11 @@ def prepare_custom_data_pool_rec(self,dictionary,parent_path):

except Exception as e:
self.log.error('prepare_custom_data_pool_rec error::%s',e )
print(e,entry_name,is_dir,is_file,is_symlink,is_bind,size,mtime)
print('prepare_custom_data_pool_rec',e,entry_name,is_dir,is_file,is_symlink,is_bind,size,mtime)

def get_cd_text(self,cd_data,is_compressed):
#'utf-8'
#return gzip.decompress(cd_data).decode("ISO-8859-1") if is_compressed else cd_data
return gzip.decompress(cd_data).decode("ISO-8859-1") if is_compressed else cd_data

def extract_custom_data(self):
Expand All @@ -397,88 +484,153 @@ def extract_custom_data(self):
self_db.files_cde_quant_sum = len(self.custom_data_pool)

self_db_cde_list = self_db.cde_list

exe = Executor()

for (list_ref,subpath,rule_nr) in self.custom_data_pool.values():
if self.abort_action:
break

expressions,use_smin,smin_int,use_smax,smax_int,executable,time_int = self_db_cde_list[rule_nr]
expressions,use_smin,smin_int,use_smax,smax_int,executable,timeout,crc = self_db_cde_list[rule_nr]

full_file_path = normpath(abspath(sep.join([scan_path,subpath])))

size = list_ref[4]

cde_run_list = executable + [full_file_path]

if crc:
self.info_line_current = f'{subpath} CRC calculation ({bytes_to_str(size)})'
crc_val = self.calc_crc(full_file_path,size)
print(crc_val)

self.info_line_current = f'{subpath} ({bytes_to_str(size)})'
try:
shell = False
timeout = time_int
#output = ''
#output = check_output(cde_run_list, stderr=STDOUT, timeout=timeout,shell=shell,encoding="ISO-8859-1")
#text=True,

p = Popen(cde_run_list, start_new_session=True, stdout=PIPE, stderr=PIPE)
cd_ok,output = exe.run(cde_run_list,timeout)

if timeout:
p.wait(timeout=timeout)
if cd_ok:
output_len = len(output)

except TimeoutExpired as et:
print('timeout on ',cde_run_list)
killpg(getpgid(p.pid), SIGTERM)
self.log.error('Custom Data Extraction subprocess timeout:%s\n%s',cde_run_list,et )
if output_len==0:
result = None
is_compressed = False
elif output_len>128:
result = gzip.compress(bytes(output,"ISO-8859-1")) #"utf-8"
#result = gzip.compress(output) #"utf-8"
is_compressed = True
else:
result = output
#.decode("ISO-8859-1")
is_compressed = False
new_list_ref_elem = [cd_ok,is_compressed,result]

cd_ok = False
else:
is_compressed = False

e_str = str(et)
e_size = getsizeof(e_str)
list_ref.append( (cd_ok,is_compressed,e_str) )
new_list_ref_elem = [cd_ok,is_compressed,output]
self_db.files_cde_errors_quant +=1
self_db.files_cde_size += e_size

except Exception as e:
print('error on ',cde_run_list)
self.log.error('Custom Data Extraction subprocess error:%s\n%s',cde_run_list,e )
if crc:
new_list_ref_elem.append(crc_val)

cd_ok = False
is_compressed = False
list_ref.append( tuple(new_list_ref_elem) )

e_str = str(e)
e_size = getsizeof(e_str)
list_ref.append( (cd_ok,is_compressed,e_str) )
print(e_str)
self_db.files_cde_quant += 1
self_db.files_cde_size += size
self_db.files_cde_size_extracted += getsizeof(output)

self_db.files_cde_errors_quant +=1
#try:
#shell = False
#output = check_output(cde_run_list, stderr=STDOUT, timeout=timeout,shell=shell,start_new_session=True)
#encoding="ISO-8859-1"
#text=True,

self_db.files_cde_size += e_size
else:
output, error = p.communicate()
#process = Popen(cde_run_list, start_new_session=True, stdout=PIPE, stderr=STDOUT)

#if timeout:
# process.wait(timeout=timeout)
#else:
# process.wait()

#except TimeoutExpired as et:
#print('timeout on ',cde_run_list)

#try:
# process.terminate()
#except Exception as term_e:
# self.log.error('Custom Data Extraction subprocess timeout termination:%s\n%s',cde_run_list,term_e )
# e_str = str(et) + '\n' + str(term_e)
#else:

#e_str = str(et)

#killpg(getpgid(process.pid), SIGTERM)
#self.log.error('Custom Data Extraction subprocess timeout:%s\n%s',cde_run_list,et )

#cd_ok = False
#is_compressed = False

#e_size = getsizeof(e_str)
#new_list_ref_elem = [cd_ok,is_compressed,e_str]
#list_ref.append( (cd_ok,is_compressed,e_str) )
#self_db.files_cde_errors_quant +=1
#self_db.files_cde_size += e_size

#except Exception as e:
#print('error on ',cde_run_list)
# self.log.error('Custom Data Extraction subprocess error:%s\n%s',cde_run_list,e )

# cd_ok = False
# is_compressed = False

# e_str = str(e)
# e_size = getsizeof(e_str)

# new_list_ref_elem = [cd_ok,is_compressed,e_str]
#list_ref.append( (cd_ok,is_compressed,e_str) )
#print(e_str)

# self_db.files_cde_errors_quant +=1

# self_db.files_cde_size += e_size
#else:
#returncode = process.returncode
#print('returncode:',returncode)

#output, error = process.communicate()
#print(output,type(output))

cd_ok = True
#cd_ok = True

output_len = len(output)
#output_len = len(output)

if output_len==0:
result = None
is_compressed = False
elif output_len>128:
#if output_len==0:
# result = None
# is_compressed = False
#elif output_len>128:
#result = gzip.compress(bytes(output,"ISO-8859-1")) #"utf-8"
result = gzip.compress(output) #"utf-8"
is_compressed = True
else:
result = output
is_compressed = False
# result = gzip.compress(output) #"utf-8"
# is_compressed = True
#else:
# result = output.decode("ISO-8859-1")
# is_compressed = False

#new_list_ref_elem = [cd_ok,is_compressed,result]

list_ref.append( (cd_ok,is_compressed,result) )
#if crc:
# new_list_ref_elem.append(crc_val)

#list_ref.append( tuple(new_list_ref_elem) )

#self_db.files_cde_quant += 1
#self_db.files_cde_size += size
#self_db.files_cde_size_extracted += getsizeof(output)

self_db.files_cde_quant += 1
self_db.files_cde_size += size
self_db.files_cde_size_extracted += getsizeof(output)
self.info_line_current = ''

del self.custom_data_pool

exe.end()

self.set_data()

self.scan_data={}
Expand Down Expand Up @@ -653,6 +805,11 @@ def load(self,db_dir,file_name):
with lzma.open(full_file_path, "rb") as gzip_file:
self.db = pickle.load(gzip_file)

global data_format_version
if self.db.data_format_version != data_format_version:
self.log.error(f'incompatible data format version error: {self.db.data_format_version} vs {data_format_version}')
return True

except Exception as e:
print('loading error:%s' % e )
return True
Expand Down
Loading

0 comments on commit 70cdb6a

Please sign in to comment.