diff --git a/queue_services/entity-bn/src/entity_bn/bn_processors/registration.py b/queue_services/entity-bn/src/entity_bn/bn_processors/registration.py index 05c188e9c2..47d976e8ff 100644 --- a/queue_services/entity-bn/src/entity_bn/bn_processors/registration.py +++ b/queue_services/entity-bn/src/entity_bn/bn_processors/registration.py @@ -13,6 +13,7 @@ # limitations under the License. """File processing rules and actions for the registration of a business.""" import json +import uuid import xml.etree.ElementTree as Et from contextlib import suppress from http import HTTPStatus @@ -133,6 +134,23 @@ async def process(business: Business, # pylint: disable=too-many-branches, too- logger.error('Failed to publish BN email message onto the NATS emailer subject', exc_info=True) raise err + # publish identifier (so other things know business has changed) + try: + payload = { + 'specversion': '1.x-wip', + 'type': 'bc.registry.business.bn', + 'source': 'entity-bn.cb_subscription_handler', + 'id': str(uuid.uuid4()), + 'time': datetime.utcnow().isoformat(), + 'datacontenttype': 'application/json', + 'identifier': business.identifier, + 'data': {} + } + subject = current_app.config['SUBSCRIPTION_OPTIONS']['subject'] + await publish_event(payload, subject) + except Exception as err: # pylint: disable=broad-except; # noqa: B902 + logger.error('Failed to publish BN update for %s %s', business.identifier, err, exc_info=True) + def _inform_cra(business: Business, # pylint: disable=too-many-locals request_tracker: RequestTracker, diff --git a/queue_services/entity-bn/src/entity_bn/worker.py b/queue_services/entity-bn/src/entity_bn/worker.py index ee5635f818..a583f83950 100644 --- a/queue_services/entity-bn/src/entity_bn/worker.py +++ b/queue_services/entity-bn/src/entity_bn/worker.py @@ -27,7 +27,6 @@ """ import json import os -import uuid from typing import Dict import nats @@ -36,7 +35,6 @@ from legal_api import db from legal_api.core import Filing as FilingCore from legal_api.models import Business -from legal_api.utils.datetime import datetime from sentry_sdk import capture_message from sqlalchemy.exc import OperationalError @@ -46,7 +44,6 @@ change_of_registration, correction, dissolution_or_put_back_on, - publish_event, registration, ) from entity_bn.exceptions import BNException, BNRetryExceededException @@ -76,7 +73,6 @@ async def process_event(msg: Dict, flask_app: Flask): # pylint: disable=too-man with flask_app.app_context(): if msg['type'] == 'bc.registry.admin.bn': await admin.process(msg) - return msg['data']['business']['identifier'] filing_core_submission = FilingCore.find_by_id(msg['data']['filing']['header']['filingId']) if not filing_core_submission: @@ -101,8 +97,6 @@ async def process_event(msg: Dict, flask_app: Flask): # pylint: disable=too-man Business.LegalTypes.PARTNERSHIP.value): dissolution_or_put_back_on.process(business, filing_core_submission.storage) - return business.identifier - async def cb_subscription_handler(msg: nats.aio.client.Msg): """Use Callback to process Queue Msg objects.""" @@ -110,24 +104,7 @@ async def cb_subscription_handler(msg: nats.aio.client.Msg): logger.info('Received raw message seq:%s, data= %s', msg.sequence, msg.data.decode()) event_message = json.loads(msg.data.decode('utf-8')) logger.debug('Event Message Received: %s', event_message) - identifier = await process_event(event_message, FLASK_APP) - # publish identifier (so other things know business has changed) - try: - payload = { - 'specversion': '1.x-wip', - 'type': 'bc.registry.business.bn', - 'source': 'entity-bn.cb_subscription_handler', - 'id': str(uuid.uuid4()), - 'time': datetime.utcnow().isoformat(), - 'datacontenttype': 'application/json', - 'identifier': identifier, - 'data': {} - } - subject = APP_CONFIG.SUBSCRIPTION_OPTIONS['subject'] - await publish_event(payload, subject) - except Exception as err: # pylint: disable=broad-except; # noqa: B902 - capture_message('Entity-bn queue publish identifier error: ' + identifier, level='error') - logger.error('Queue Publish queue publish identifier error: %s %s', identifier, err, exc_info=True) + await process_event(event_message, FLASK_APP) except OperationalError as err: logger.error('Queue Blocked - Database Issue: %s', json.dumps(event_message), exc_info=True) raise err # We don't want to handle the error, as a DB down would drain the queue diff --git a/queue_services/entity-bn/tests/unit/bn_processors/test_registration.py b/queue_services/entity-bn/tests/unit/bn_processors/test_registration.py index 922c038e70..489c80b539 100644 --- a/queue_services/entity-bn/tests/unit/bn_processors/test_registration.py +++ b/queue_services/entity-bn/tests/unit/bn_processors/test_registration.py @@ -14,6 +14,7 @@ """The Test Suites to ensure that the registration is operating correctly.""" import xml.etree.ElementTree as Et +from flask import current_app import pytest from legal_api.models import Business, RequestTracker @@ -47,7 +48,12 @@ def side_effect(input_xml): return 200, acknowledgement_response mocker.patch('entity_bn.bn_processors.registration.request_bn_hub', side_effect=side_effect) - mocker.patch('entity_bn.bn_processors.registration.publish_event') + + subjects_in_queue = {} + def publish_event(payload, subject): + subjects_in_queue[subject] = payload + + mocker.patch('entity_bn.bn_processors.registration.publish_event', side_effect=publish_event) business_number = '993775204' business_program_id = 'BC' @@ -87,6 +93,10 @@ def side_effect(input_xml): business = Business.find_by_internal_id(business_id) assert business.tax_id == f'{business_number}{business_program_id}{str(program_account_ref_no).zfill(4)}' + assert current_app.config['EMAIL_PUBLISH_OPTIONS']['subject'] in subjects_in_queue + assert current_app.config['SUBSCRIPTION_OPTIONS']['subject'] in subjects_in_queue + + @pytest.mark.asyncio @pytest.mark.parametrize('request_type', [ (RequestTracker.RequestType.INFORM_CRA),