Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

19513 - Add CSV Feedback monitoring #1598

Merged
merged 4 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pay-queue/devops/vaults.gcp.env
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ SENTRY_DSN="op://sentry/$APP_ENV/relationship-api/SENTRY_DSN"
LEGISLATIVE_TIMEZONE="op://relationship/$APP_ENV/pay-api/LEGISLATIVE_TIMEZONE"
ACCOUNT_SECRET_KEY="op://relationship/$APP_ENV/pay-api/ACCOUNT_SECRET_KEY"
DISABLE_EJV_ERROR_EMAIL="True"
DISABLE_CSV_ERROR_EMAIL="True"
IT_OPS_EMAIL='[email protected]'
NOTIFY_API_URL="op://API/$APP_ENV/notify-api/NOTIFY_API_URL"
NOTIFY_API_VERSION="op://API/$APP_ENV/notify-api/NOTIFY_API_VERSION"
35 changes: 35 additions & 0 deletions pay-queue/src/pay_queue/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright © 2024 Province of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Jobs uses service accounts to retrieve the token."""
import base64

from flask import current_app
from pay_api.services.oauth_service import OAuthService
from pay_api.utils.enums import AuthHeaderType, ContentType


def get_token():
"""Get service account token."""
issuer_url = current_app.config.get('JWT_OIDC_ISSUER')

token_url = issuer_url + '/protocol/openid-connect/token'
# https://sso-dev.pathfinder.gov.bc.ca/auth/realms/fcf0kpqr/protocol/openid-connect/token
seeker25 marked this conversation as resolved.
Show resolved Hide resolved
basic_auth_encoded = base64.b64encode(
bytes(current_app.config.get('KEYCLOAK_SERVICE_ACCOUNT_ID') + ':' + current_app.config.get(
'KEYCLOAK_SERVICE_ACCOUNT_SECRET'), 'utf-8')).decode('utf-8')
data = 'grant_type=client_credentials'
token_response = OAuthService.post(token_url, basic_auth_encoded,
AuthHeaderType.BASIC, ContentType.FORM_URL_ENCODED, data)
token = token_response.json().get('access_token')
return token
8 changes: 7 additions & 1 deletion pay-queue/src/pay_queue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,13 @@ class _Config(): # pylint: disable=too-few-public-methods,protected-access
# Secret key for encrypting bank account
ACCOUNT_SECRET_KEY = os.getenv('ACCOUNT_SECRET_KEY')

# Disable EJV Error Email
NOTIFY_API_URL = os.getenv('NOTIFY_API_URL', '')
NOTIFY_API_VERSION = os.getenv('NOTIFY_API_VERSION', '')
NOTIFY_API_ENDPOINT = f'{NOTIFY_API_URL + NOTIFY_API_VERSION}/'
IT_OPS_EMAIL = os.getenv('IT_OPS_EMAIL', '[email protected]')

DISABLE_EJV_ERROR_EMAIL = os.getenv('DISABLE_EJV_ERROR_EMAIL', 'true').lower() == 'true'
DISABLE_CSV_ERROR_EMAIL = os.getenv('DISABLE_CSV_ERROR_EMAIL', 'true').lower() == 'true'

# PUB/SUB - PUB: account-mailer-dev, auth-event-dev, SUB to ftp-poller-payment-reconciliation-dev, business-events
ACCOUNT_MAILER_TOPIC = os.getenv('ACCOUNT_MAILER_TOPIC', 'account-mailer-dev')
Expand Down Expand Up @@ -159,6 +164,7 @@ class TestConfig(_Config): # pylint: disable=too-few-public-methods
TEST_PUSH_ENDPOINT = os.getenv('TEST_PUSH_ENDPOINT', f'http://host.docker.internal:{str(TEST_PUSH_ENDPOINT_PORT)}/')
GCP_AUTH_KEY = None
DISABLE_EJV_ERROR_EMAIL = False
DISABLE_CSV_ERROR_EMAIL = False


class ProdConfig(_Config): # pylint: disable=too-few-public-methods
Expand Down
2 changes: 1 addition & 1 deletion pay-queue/src/pay_queue/resources/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def worker():
try:
current_app.logger.info('Event Message Received: %s ', json.dumps(dataclasses.asdict(ce)))
if ce.type == QueueMessageTypes.CAS_MESSAGE_TYPE.value:
reconcile_payments(ce.data)
reconcile_payments(ce)
elif ce.type == QueueMessageTypes.CGI_ACK_MESSAGE_TYPE.value:
reconcile_distributions(ce.data)
elif ce.type == QueueMessageTypes.CGI_FEEDBACK_MESSAGE_TYPE.value:
Expand Down
131 changes: 97 additions & 34 deletions pay-queue/src/pay_queue/services/payment_reconciliations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.
"""Payment reconciliation file."""
import csv
import dataclasses
import json
import os
from datetime import datetime
from decimal import Decimal
from typing import Dict, List, Tuple

from flask import current_app
from jinja2 import Environment, FileSystemLoader
from pay_api.models import CasSettlement as CasSettlementModel
from pay_api.models import CfsAccount as CfsAccountModel
from pay_api.models import Credit as CreditModel
Expand All @@ -35,20 +38,24 @@
from pay_api.services.cfs_service import CFSService
from pay_api.services.gcp_queue_publisher import QueueMessage
from pay_api.services.non_sufficient_funds import NonSufficientFundsService
from pay_api.services.oauth_service import OAuthService
from pay_api.services.payment_transaction import PaymentTransaction as PaymentTransactionService
from pay_api.utils.enums import (
CfsAccountStatus, InvoiceReferenceStatus, InvoiceStatus, LineItemStatus, PaymentMethod, PaymentStatus, QueueSources)
AuthHeaderType, CfsAccountStatus, ContentType, InvoiceReferenceStatus, InvoiceStatus, LineItemStatus, PaymentMethod,
PaymentStatus, QueueSources)
from pay_api.utils.util import get_topic_for_corp_type
from sbc_common_components.utils.enums import QueueMessageTypes
from sentry_sdk import capture_message

from pay_queue import config
from pay_queue.auth import get_token
from pay_queue.minio import get_object

from ..enums import Column, RecordType, SourceTransaction, Status, TargetTransaction


APP_CONFIG = config.get_named_config(os.getenv('DEPLOYMENT_ENV', 'production'))
ENV = Environment(loader=FileSystemLoader('.'), autoescape=True)


def _create_payment_records(csv_content: str):
Expand Down Expand Up @@ -174,7 +181,7 @@ def _get_payment_by_inv_number_and_status(inv_number: str, status: str) -> Payme
return payment


def reconcile_payments(msg: Dict[str, any]):
def reconcile_payments(ce):
"""Read the file and update payment details.

1: Check to see if file has been processed already.
Expand All @@ -186,6 +193,7 @@ def reconcile_payments(msg: Dict[str, any]):
3.3 : If transaction status is PARTIAL, update payment and invoice status, publish to account mailer.
4: If the transaction is On Account for Credit, apply the credit to the account.
"""
msg = ce.data
file_name: str = msg.get('fileName')
minio_location: str = msg.get('location')

Expand All @@ -200,6 +208,18 @@ def reconcile_payments(msg: Dict[str, any]):

file = get_object(minio_location, file_name)
content = file.data.decode('utf-8-sig')

error_messages = []
has_errors, error_messages = _process_file_content(content, cas_settlement, msg, error_messages)

if has_errors and not APP_CONFIG.DISABLE_CSV_ERROR_EMAIL:
_send_error_email(file_name, minio_location, error_messages, msg, cas_settlement.__tablename__)


def _process_file_content(content: str, cas_settlement: CasSettlementModel,
msg: Dict[str, any], error_messages: List[Dict[str, any]]):
"""Process the content of the feedback file."""
has_errors = False
# Iterate the rows and create key value pair for each row
for row in csv.DictReader(content.splitlines()):
# Convert lower case keys to avoid any key mismatch
Expand All @@ -220,18 +240,19 @@ def reconcile_payments(msg: Dict[str, any]):
# PS : Duplicating some code to make the code more readable.
if record_type in pad_record_types:
# Handle invoices
_process_consolidated_invoices(row)
has_errors = _process_consolidated_invoices(row, error_messages) or has_errors
elif record_type in (RecordType.BOLP.value, RecordType.EFTP.value):
# EFT, WIRE and Online Banking are one-to-one invoice. So handle them in same way.
_process_unconsolidated_invoices(row)
has_errors = _process_unconsolidated_invoices(row, error_messages) or has_errors
elif record_type in (RecordType.ONAC.value, RecordType.CMAP.value, RecordType.DRWP.value):
_process_credit_on_invoices(row)
has_errors = _process_credit_on_invoices(row, error_messages) or has_errors
elif record_type == RecordType.ADJS.value:
current_app.logger.info('Adjustment received for %s.', msg)
else:
# For any other transactions like DM log error and continue.
current_app.logger.error('Record Type is received as %s, and cannot process %s.', record_type, msg)
capture_message(f'Record Type is received as {record_type}, and cannot process {msg}.', level='error')
error_msg = f'Record Type is received as {record_type}, and cannot process {msg}.'
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
# Continue processing

# Commit the transaction and process next row.
Expand All @@ -247,9 +268,46 @@ def reconcile_payments(msg: Dict[str, any]):

cas_settlement.processed_on = datetime.now()
cas_settlement.save()
return has_errors, error_messages


def _send_error_email(file_name: str, minio_location: str,
error_messages: List[Dict[str, any]], ce, table_name: str):
"""Send the email asynchronously, using the given details."""
subject = 'Payment Reconciliation Failure'
token = get_token()
recipient = APP_CONFIG.IT_OPS_EMAIL
template = ENV.get_template('src/pay_queue/templates/statement_notification.html')
seeker25 marked this conversation as resolved.
Show resolved Hide resolved
params = {
seeker25 marked this conversation as resolved.
Show resolved Hide resolved
'fileName': file_name,
'errorMessages': error_messages,
'minioLocation': minio_location,
'payload': json.dumps(dataclasses.asdict(ce)),
'table_name': table_name
}
html_body = template.render(params)
current_app.logger.info(f'_send_error_email to recipients: {recipient}')
notify_url = current_app.config.get('NOTIFY_API_ENDPOINT') + 'notify/'
notify_body = {
'recipient': recipient,
'content': {
'subject': subject,
'body': html_body
}
}
notify_response = OAuthService.post(notify_url, token=token,
auth_header_type=AuthHeaderType.BEARER,
content_type=ContentType.JSON, data=notify_body)
if notify_response:
response_json = json.loads(notify_response.text)
if response_json.get('notifyStatus', 'FAILURE') != 'FAILURE':
current_app.logger.info('_send_error_email notify_response')
else:
current_app.logger.info('_send_error_email failed')


def _process_consolidated_invoices(row):
def _process_consolidated_invoices(row, error_messages: List[Dict[str, any]]) -> bool:
has_errors = False
target_txn_status = _get_row_value(row, Column.TARGET_TXN_STATUS)
if (target_txn := _get_row_value(row, Column.TARGET_TXN)) == TargetTransaction.INV.value:
inv_number = _get_row_value(row, Column.TARGET_TXN_NO)
Expand All @@ -268,11 +326,10 @@ def _process_consolidated_invoices(row):
)

if not inv_references and not completed_inv_references:
current_app.logger.error('No invoice found for %s in the system, and cannot process %s.',
inv_number, row)
capture_message(f'No invoice found for {inv_number} in the system, and cannot process {row}.',
level='error')
return
error_msg = f'No invoice found for {inv_number} in the system, and cannot process {row}.'
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
return has_errors
_process_paid_invoices(inv_references, row)
elif target_txn_status.lower() == Status.NOT_PAID.value.lower() \
or record_type in (RecordType.PADR.value, RecordType.PAYR.value):
Expand All @@ -282,10 +339,10 @@ def _process_consolidated_invoices(row):
# Send mailer and account events to update status and send email notification
_publish_account_events(QueueMessageTypes.NSF_LOCK_ACCOUNT.value, payment_account, row)
else:
current_app.logger.error('Target Transaction Type is received as %s for PAD, and cannot process %s.',
target_txn, row)
capture_message(
f'Target Transaction Type is received as {target_txn} for PAD, and cannot process.', level='error')
error_msg = f'Target Transaction Type is received as {target_txn} for PAD, and cannot process {row}.'
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
return has_errors


def _find_invoice_reference_by_number_and_status(inv_number: str, status: str):
Expand All @@ -296,7 +353,8 @@ def _find_invoice_reference_by_number_and_status(inv_number: str, status: str):
return inv_references


def _process_unconsolidated_invoices(row):
def _process_unconsolidated_invoices(row, error_messages: List[Dict[str, any]]) -> bool:
has_errors = False
target_txn_status = _get_row_value(row, Column.TARGET_TXN_STATUS)
record_type = _get_row_value(row, Column.RECORD_TYPE)
if (target_txn := _get_row_value(row, Column.TARGET_TXN)) == TargetTransaction.INV.value:
Expand All @@ -317,11 +375,10 @@ def _process_unconsolidated_invoices(row):
current_app.logger.info('Found %s completed invoice references for invoice number %s',
len(completed_inv_references), inv_number)
if len(completed_inv_references) != 1:
current_app.logger.error('More than one or none invoice reference received '
'for invoice number %s for %s', inv_number, record_type)
capture_message(
f'More than one or none invoice reference received for invoice number {inv_number} for '
f'{record_type}', level='error')
error_msg = (f'More than one or none invoice reference '
f'received for invoice number {inv_number} for {record_type}')
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
else:
# Handle fully PAID and Partially Paid scenarios.
if target_txn_status.lower() == Status.PAID.value.lower():
Expand All @@ -332,14 +389,21 @@ def _process_unconsolidated_invoices(row):
# As per validation above, get first and only inv ref
_process_partial_paid_invoices(inv_references[0], row)
else:
current_app.logger.error('Target Transaction Type is received as %s for %s, and cannot process.',
target_txn, record_type)
capture_message(
f'Target Transaction Type is received as {target_txn} for {record_type}, and cannot process.',
level='error')
error_msg = (f'Target Transaction Type is received '
f'as {target_txn} for {record_type}, and cannot process.')
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
return has_errors


def _csv_error_handling(row, error_msg: str, error_messages: List[Dict[str, any]]):
current_app.logger.error(error_msg)
capture_message(error_msg, level='error')
error_messages.append({'error': error_msg, 'row': row})


def _process_credit_on_invoices(row):
def _process_credit_on_invoices(row, error_messages: List[Dict[str, any]]) -> bool:
has_errors = False
# Credit memo can happen for any type of accounts.
target_txn_status = _get_row_value(row, Column.TARGET_TXN_STATUS)
if _get_row_value(row, Column.TARGET_TXN) == TargetTransaction.INV.value:
Expand All @@ -358,11 +422,10 @@ def _process_credit_on_invoices(row):
current_app.logger.info('Partially PAID using credit memo. '
'Ignoring as the credit memo payment is already captured.')
else:
current_app.logger.error('Target Transaction status is received as %s for CMAP, and cannot process.',
target_txn_status)
capture_message(
f'Target Transaction status is received as {target_txn_status} for CMAP, and cannot process.',
level='error')
error_msg = f'Target Transaction status is received as {target_txn_status} for CMAP, and cannot process.'
_csv_error_handling(row, error_msg, error_messages)
has_errors = True
return has_errors


def _process_paid_invoices(inv_references, row):
Expand Down
6 changes: 6 additions & 0 deletions pay-queue/src/pay_queue/templates/csv_failed_email.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The following CSV feedback failed:
seeker25 marked this conversation as resolved.
Show resolved Hide resolved
Feedback File Name: {{ file_name }}
seeker25 marked this conversation as resolved.
Show resolved Hide resolved
Feedback File Location: {{ minio_location }}
Table to update: {{ table_name }}
Errors: {{ error_messages }}
Payload: {{ payload }}
Loading
Loading