Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24990 & 24991 - Use Google buckets instead of SFTP #1867

Merged
merged 21 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions jobs/ftp-poller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class _Config(object): # pylint: disable=too-few-public-methods
GCP_AUTH_KEY = os.getenv("AUTHPAY_GCP_AUTH_KEY", None)
PUB_ENABLE_MESSAGE_ORDERING = os.getenv("PUB_ENABLE_MESSAGE_ORDERING", "True")

FTP_POLLER_BUCKET_NAME = os.getenv("FTP_POLLER_BUCKET_NAME", "")
GOOGLE_STORAGE_SA = os.getenv("GOOGLE_STORAGE_SA", "")

TESTING = False
DEBUG = True

Expand Down
3 changes: 3 additions & 0 deletions jobs/ftp-poller/devops/vaults.ocp.env
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ SENTRY_DSN=op://sentry/$APP_ENV/relationship-api/SENTRY_DSN

FTP_POLLER_TOPIC=op://gcp-queue/$APP_ENV/topics/FTP_POLLER_TOPIC
AUTHPAY_GCP_AUTH_KEY=op://gcp-queue/$APP_ENV/gtksf3/AUTHPAY_GCP_AUTH_KEY

FTP_POLLER_PENDING_BUCKET_NAME="op://buckets/$APP_ENV/pay/FTP_POLLER_PENDING_BUCKET_NAME"
GOOGLE_STORAGE_SA="op://buckets/$APP_ENV/pay/GOOGLE_STORAGE_SA"
27 changes: 16 additions & 11 deletions jobs/ftp-poller/invoke_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,26 @@ def run(job_name):
from tasks.cas_poller_ftp import CASPollerFtpTask
from tasks.cgi_feeder_poller_task import CGIFeederPollerTask
from tasks.eft_poller_ftp import EFTPollerFtpTask
from tasks.google_bucket_poller import GoogleBucketPollerTask

application = create_app()

application.app_context().push()
if job_name == "CAS_FTP_POLLER":
CASPollerFtpTask.poll_ftp()
application.logger.info(f"<<<< Completed Polling CAS FTP >>>>")
elif job_name == "CGI_FTP_POLLER":
CGIFeederPollerTask.poll_ftp()
application.logger.info(f"<<<< Completed Polling CGI FTP >>>>")
elif job_name == "EFT_FTP_POLLER":
EFTPollerFtpTask.poll_ftp()
application.logger.info(f"<<<< Completed Polling EFT FTP >>>>")
else:
application.logger.debug("No valid args passed.Exiting job without running any ***************")
match job_name:
case "CAS_FTP_POLLER":
CASPollerFtpTask.poll_ftp()
application.logger.info("<<<< Completed Polling CAS FTP >>>>")
case "CGI_FTP_POLLER":
CGIFeederPollerTask.poll_ftp()
application.logger.info("<<<< Completed Polling CGI FTP >>>>")
case "EFT_FTP_POLLER":
EFTPollerFtpTask.poll_ftp()
application.logger.info("<<<< Completed Polling EFT FTP >>>>")
case "GOOGLE_BUCKET_POLLER":
GoogleBucketPollerTask.poll_google_bucket_for_ejv_files()
application.logger.info("<<<< Completed Polling Google Buckets >>>>")
case _:
application.logger.debug("No valid args passed.Exiting job without running any ***************")


if __name__ == "__main__":
Expand Down
100 changes: 99 additions & 1 deletion jobs/ftp-poller/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions jobs/ftp-poller/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ launchdarkly-server-sdk = "^8.2.1"
sbc-common-components = {git = "https://github.com/bcgov/sbc-common-components.git", subdirectory = "python"}
pay-api = {git = "https://github.com/bcgov/sbc-pay.git", branch = "main", subdirectory = "pay-api"}
wheel = "^0.43.0"
google-cloud-storage = "^2.19.0"


[tool.poetry.group.dev.dependencies]
Expand Down
86 changes: 86 additions & 0 deletions jobs/ftp-poller/tasks/google_bucket_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Google Bucket Poller."""

import base64
import json
import os
import tempfile
import traceback

from flask import current_app
from google.cloud import storage

from services.sftp import SFTPService


class GoogleBucketPollerTask:
"""Relies on pay-jobs creating files in the processing bucket, should move them into processed after sftp upload."""

client: storage.Client = None
bucket: storage.Bucket = None

@classmethod
def poll_google_bucket_for_ejv_files(cls):
"""Check google bucket for ejv files that PAY-JOBS has created."""
try:
current_app.logger.info('Polling Google bucket for EJV files.')
cls.initialize_storage_client()
file_paths = cls._get_processing_files()
cls._upload_to_sftp(file_paths)
cls._move_to_processed_folder(file_paths)
except Exception as e: # NOQA # pylint: disable=broad-except
current_app.logger.error(f"{{error: {str(e)}, stack_trace: {traceback.format_exc()}}}")

@classmethod
def initialize_storage_client(cls):
"""Initialize google storage client / bucket for use."""
current_app.logger.info("Initializing Google Storage client.")
ftp_poller_bucket_name = current_app.config.get("FTP_POLLER_BUCKET_NAME")
json_text = base64.b64decode(current_app.config.get("GOOGLE_STORAGE_SA")).decode("utf-8")
auth_json = json.loads(json_text, strict=False)
cls.client = storage.Client.from_service_account_info(auth_json)
cls.bucket = cls.client.bucket(ftp_poller_bucket_name)

@classmethod
def _get_processing_files(cls):
"""Download all files to temp folder, so they can be SFTP'd and moved to processed."""
file_paths = []
for blob in cls.bucket.list_blobs(prefix="cgi_processing/"):
# Skip if folder.
if blob.name.endswith("/"):
continue
target_path = f"{tempfile.gettempdir()}/{blob.name.split('/')[-1]}"
current_app.logger.info(f"Downloading {blob.name} to {target_path}")
# This automatically overrides if the file exists.
blob.download_to_filename(target_path)
Jxio marked this conversation as resolved.
Show resolved Hide resolved
file_paths.append(target_path)
current_app.logger.info(f"Processing files: {file_paths}")
return file_paths

@classmethod
def _move_to_processed_folder(cls, file_paths):
"""Move files from processing to processed folder."""
for file_path in file_paths:
file_name = os.path.basename(file_path)
current_app.logger.info(f"Moving {file_name} from processing to processed folder.")
source_blob = cls.bucket.blob(f"cgi_processing/{file_name}")
destination = f"cgi_processed/{file_name}"
cls.bucket.copy_blob(source_blob, cls.bucket, destination)
source_blob.delete()
current_app.logger.info(f"File moved to {destination}.")

@classmethod
def _upload_to_sftp(cls, file_paths):
"""Handle SFTP upload for these files."""
if not file_paths:
return
current_app.logger.info("Uploading files via SFTP to CAS.")
with SFTPService.get_connection("CGI") as sftp_client:
for file_path in file_paths:
current_app.logger.info(f"Uploading file: {file_path}")
ftp_dir: str = current_app.config.get("CGI_SFTP_DIRECTORY")
target_file = os.path.basename(file_path)
sftp_client.chdir(ftp_dir)
sftp_client.put(file_path, target_file)
current_app.logger.info(f"Uploaded file from: {file_path} to {ftp_dir}/{target_file}")
os.remove(file_path)
current_app.logger.info("Uploading files via SFTP done.")
3 changes: 3 additions & 0 deletions jobs/payment-jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ class _Config(object): # pylint: disable=too-few-public-methods
EFT_TRANSFER_DESC = os.getenv("EFT_TRANSFER_DESC", "BCREGISTRIES {} {} EFT TRANSFER")
EFT_OVERDUE_NOTIFY_EMAILS = os.getenv("EFT_OVERDUE_NOTIFY_EMAILS", "")

FTP_POLLER_BUCKET_NAME = os.getenv("FTP_POLLER_BUCKET_NAME", "")
GOOGLE_STORAGE_SA = os.getenv("GOOGLE_STORAGE_SA", "")


class DevConfig(_Config): # pylint: disable=too-few-public-methods
TESTING = False
Expand Down
2 changes: 2 additions & 0 deletions jobs/payment-jobs/devops/vaults.gcp.env
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,5 @@ DW_PASSWORD="op://database/$APP_ENV/fin-warehouse/DB_READONLY_PASSWORD"
DW_HOST="op://database/$APP_ENV/fin-warehouse/DATABASE_HOST"
DW_PORT="op://database/$APP_ENV/fin-warehouse/DATABASE_PORT"
PAY_CONNECTOR_AUTH="op://relationship/$APP_ENV/pay-api/PAY_CONNECTOR_AUTH"
FTP_POLLER_PENDING_BUCKET_NAME="op://buckets/$APP_ENV/pay/FTP_POLLER_PENDING_BUCKET_NAME"
GOOGLE_STORAGE_SA="op://buckets/$APP_ENV/pay/GOOGLE_STORAGE_SA"
Loading