Skip to content

Commit

Permalink
Split de la méthode get_latest_encrypted_filenames en deux
Browse files Browse the repository at this point in the history
  • Loading branch information
alanzirek committed Mar 2, 2025
1 parent cb98f4b commit 10da675
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 34 deletions.
58 changes: 27 additions & 31 deletions core/agricoll.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def missing_host_key(self, client, hostname, key):


class SftpAgricoll:
ENCRYPTED_DATA_FILE_SUFFIX = ".encrypted"
ENCRYPTED_SYMMETRIC_KEY_FILE_SUFFIX = ".key.encrypted"

def __init__(self):
self.logger = logging.getLogger(__name__)
self.client: SSHClient | None = None
Expand Down Expand Up @@ -80,39 +83,32 @@ def print_files(self):
mod_time = datetime.datetime.fromtimestamp(file_attr.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
print(f"- {file_attr.filename}, Modifié: {mod_time})")

def get_latest_encrypted_filenames(self) -> tuple[str, str]:
"""Trouve les fichiers chiffrés (fichier de données et la clé symétrique) les plus récents sur le serveur SFTP"""
def get_latest_encrypted_data_filename(self) -> str:
"""Trouve le fichier de données chiffré (.encrypted) le plus récent."""
file_list = self.sftp.listdir_attr()
encrypted_files = [
f
for f in file_list
if f.filename.endswith(self.ENCRYPTED_DATA_FILE_SUFFIX)
and not f.filename.endswith(self.ENCRYPTED_SYMMETRIC_KEY_FILE_SUFFIX)
]
try:
return max(encrypted_files, key=lambda f: f.st_mtime).filename
except ValueError:
raise FileNotFoundError(
f"Aucun fichier de données chiffré ({self.ENCRYPTED_DATA_FILE_SUFFIX}) trouvé sur le serveur SFTP"
)

if not file_list:
raise FileNotFoundError("Aucun fichier trouvé sur le serveur SFTP")

latest_encrypted_data_file_attrs = None
latest_encrypted_symmetric_key_file_attrs = None
encrypted_data_file_suffix = ".encrypted"
encrypted_symmetric_key_file_suffix = ".key.encrypted"
for file_attr in file_list:
if file_attr.filename.endswith(encrypted_data_file_suffix) and not file_attr.filename.endswith(
encrypted_symmetric_key_file_suffix
):
if (
latest_encrypted_data_file_attrs is None
or file_attr.st_mtime > latest_encrypted_data_file_attrs.st_mtime
):
latest_encrypted_data_file_attrs = file_attr
elif file_attr.filename.endswith(encrypted_symmetric_key_file_suffix):
if (
latest_encrypted_symmetric_key_file_attrs is None
or file_attr.st_mtime > latest_encrypted_symmetric_key_file_attrs.st_mtime
):
latest_encrypted_symmetric_key_file_attrs = file_attr

if not latest_encrypted_data_file_attrs:
raise FileNotFoundError("Aucun fichier de données chiffré (.encrypted) trouvé sur le serveur SFTP")
if not latest_encrypted_symmetric_key_file_attrs:
raise FileNotFoundError("Aucune clé symétrique chiffrée (.key.encrypted) trouvé sur le serveur SFTP")

return latest_encrypted_data_file_attrs.filename, latest_encrypted_symmetric_key_file_attrs.filename
def get_latest_encrypted_symmetric_key_filename(self) -> str:
"""Trouve la clé symétrique chiffrée (.key.encrypted) la plus récente."""
file_list = self.sftp.listdir_attr()
key_files = [f for f in file_list if f.filename.endswith(self.ENCRYPTED_SYMMETRIC_KEY_FILE_SUFFIX)]
try:
return max(key_files, key=lambda f: f.st_mtime).filename
except ValueError:
raise FileNotFoundError(
f"Aucune clé symétrique chiffrée ({self.ENCRYPTED_SYMMETRIC_KEY_FILE_SUFFIX}) trouvée sur le serveur SFTP"
)

def download_files(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ def handle(self, *args, **options):
sftp_agricoll = SftpAgricoll()
sftp_agricoll.connect_to_sftp()
sftp_agricoll.print_files()
latest_encrypted_data_filename, latest_encrypted_symmetric_key_filename = (
sftp_agricoll.get_latest_encrypted_filenames()
)
latest_encrypted_data_filename = sftp_agricoll.get_latest_encrypted_data_filename()
latest_encrypted_symmetric_key_filename = sftp_agricoll.get_latest_encrypted_symmetric_key_filename()
sftp_agricoll.download_files(latest_encrypted_data_filename, latest_encrypted_symmetric_key_filename)
sftp_agricoll.decrypt_symmetric_key()
contacts_agricoll_file_path = sftp_agricoll.decrypt_data_file()
Expand Down

0 comments on commit 10da675

Please sign in to comment.