Skip to content

Commit

Permalink
SFTP left to go
Browse files Browse the repository at this point in the history
  • Loading branch information
seeker25 committed Dec 20, 2024
1 parent f629161 commit fc228c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
4 changes: 2 additions & 2 deletions jobs/ftp-poller/invoke_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def run(job_name):
from tasks.cas_poller_ftp import CASPollerFtpTask
from tasks.cgi_feeder_poller_task import CGIFeederPollerTask
from tasks.eft_poller_ftp import EFTPollerFtpTask
from tasks.google_bucket_poller import GoogleBucketPoller
from tasks.google_bucket_poller import GoogleBucketPollerTask

application = create_app()

Expand All @@ -79,7 +79,7 @@ def run(job_name):
EFTPollerFtpTask.poll_ftp()
application.logger.info("<<<< Completed Polling EFT FTP >>>>")
case "GOOGLE_BUCKET_POLLER":
GoogleBucketPoller.poll_google_bucket_for_ejv_files()
GoogleBucketPollerTask.poll_google_bucket_for_ejv_files()
application.logger.info("<<<< Completed Polling Google Buckets >>>>")
case _:
application.logger.debug("No valid args passed.Exiting job without running any ***************")
Expand Down
38 changes: 24 additions & 14 deletions jobs/ftp-poller/tasks/google_bucket_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import base64
import json
import os
import tempfile

from flask import current_app
from google.cloud import storage

from services.sftp import SFTPService


class GoogleBucketTask:
class GoogleBucketPollerTask:
"""Relies on pay-jobs creating files in the processing bucket, should move them into processed after sftp upload."""

client: storage.Client = None
Expand All @@ -21,8 +23,7 @@ def poll_google_bucket_for_ejv_files(cls):
cls.initialize_storage_client()
files = cls._get_processing_files()
cls._upload_to_sftp(files)
for file in files:
cls._move_to_processed_folder(file)
cls._move_to_processed_folder(files)

@classmethod
def initialize_storage_client(cls):
Expand All @@ -35,22 +36,30 @@ def initialize_storage_client(cls):

@classmethod
def _get_processing_files(cls):
"""List out all files, so they can be moved to processed."""
blobs = cls.bucket.list_blobs(prefix="cgi_processing/")
"""Download all files, so they can be SFTP'd and moved to processed."""
files = []
for blob in blobs:
files.append(cls.bucket.blob(blob))
current_app.logger.info(f"List of processing files: {blobs}")
for blob in cls.bucket.list_blobs(prefix="cgi_processing/"):
# Skip if folder.
if blob.name.endswith('/'):
continue
target_path = f"{tempfile.gettempdir()}/{blob.name.split('/')[-1]}"
current_app.logger.info(f"Downloading {blob.name} to {target_path}")
blob.download_to_filename(target_path)
files.append(target_path)
current_app.logger.info(f"List of processing files: {files}")
return files

@classmethod
def _move_to_processed_folder(cls, name):
def _move_to_processed_folder(cls, files):
"""Move files from processing to processed folder."""
current_app.logger.info("Moving from processing to processed folder.")
source_blob = cls.bucket.blob(f"cgi_processing/{name}")
cls.bucket.copy_blob(source_blob, cls.bucket, f"cgi_processed/{name}")
source_blob.delete()
print(f"File moved to cgi_processed/{name}")
for file in files:
file = os.path.basename(file)
current_app.logger.info(f"Moving {file} from processing to processed folder.")
source_blob = cls.bucket.blob(f"cgi_processing/{file}")
destination = f"cgi_processed/{file}"
cls.bucket.copy_blob(source_blob, cls.bucket, destination)
source_blob.delete()
current_app.logger.info(f"File moved to cgi_processed/{file}")

@classmethod
def _upload_to_sftp(cls, files):
Expand All @@ -61,3 +70,4 @@ def _upload_to_sftp(cls, files):
current_app.logger.info(f"Uploading file: {file.name}")
ftp_dir: str = current_app.config.get("CGI_SFTP_DIRECTORY")
sftp_client.put(file, ftp_dir + "/" + file.name)
current_app.logger.info(f"Uploaded file: {file.name}")

0 comments on commit fc228c3

Please sign in to comment.