Skip to content

Commit

Permalink
22819 publish an event to ENTITY_EVENT_SUBJECT while receiving BN15 (b…
Browse files Browse the repository at this point in the history
  • Loading branch information
vysakh-menon-aot authored Aug 21, 2024
1 parent 0b7625b commit 32e8859
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""File processing rules and actions for the registration of a business."""
import json
import uuid
import xml.etree.ElementTree as Et
from contextlib import suppress
from http import HTTPStatus
Expand Down Expand Up @@ -133,6 +134,23 @@ async def process(business: Business, # pylint: disable=too-many-branches, too-
logger.error('Failed to publish BN email message onto the NATS emailer subject', exc_info=True)
raise err

# publish identifier (so other things know business has changed)
try:
payload = {
'specversion': '1.x-wip',
'type': 'bc.registry.business.bn',
'source': 'entity-bn.cb_subscription_handler',
'id': str(uuid.uuid4()),
'time': datetime.utcnow().isoformat(),
'datacontenttype': 'application/json',
'identifier': business.identifier,
'data': {}
}
subject = current_app.config['SUBSCRIPTION_OPTIONS']['subject']
await publish_event(payload, subject)
except Exception as err: # pylint: disable=broad-except; # noqa: B902
logger.error('Failed to publish BN update for %s %s', business.identifier, err, exc_info=True)


def _inform_cra(business: Business, # pylint: disable=too-many-locals
request_tracker: RequestTracker,
Expand Down
25 changes: 1 addition & 24 deletions queue_services/entity-bn/src/entity_bn/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"""
import json
import os
import uuid
from typing import Dict

import nats
Expand All @@ -36,7 +35,6 @@
from legal_api import db
from legal_api.core import Filing as FilingCore
from legal_api.models import Business
from legal_api.utils.datetime import datetime
from sentry_sdk import capture_message
from sqlalchemy.exc import OperationalError

Expand All @@ -46,7 +44,6 @@
change_of_registration,
correction,
dissolution_or_put_back_on,
publish_event,
registration,
)
from entity_bn.exceptions import BNException, BNRetryExceededException
Expand Down Expand Up @@ -76,7 +73,6 @@ async def process_event(msg: Dict, flask_app: Flask): # pylint: disable=too-man
with flask_app.app_context():
if msg['type'] == 'bc.registry.admin.bn':
await admin.process(msg)
return msg['data']['business']['identifier']

filing_core_submission = FilingCore.find_by_id(msg['data']['filing']['header']['filingId'])
if not filing_core_submission:
Expand All @@ -101,33 +97,14 @@ async def process_event(msg: Dict, flask_app: Flask): # pylint: disable=too-man
Business.LegalTypes.PARTNERSHIP.value):
dissolution_or_put_back_on.process(business, filing_core_submission.storage)

return business.identifier


async def cb_subscription_handler(msg: nats.aio.client.Msg):
"""Use Callback to process Queue Msg objects."""
try:
logger.info('Received raw message seq:%s, data= %s', msg.sequence, msg.data.decode())
event_message = json.loads(msg.data.decode('utf-8'))
logger.debug('Event Message Received: %s', event_message)
identifier = await process_event(event_message, FLASK_APP)
# publish identifier (so other things know business has changed)
try:
payload = {
'specversion': '1.x-wip',
'type': 'bc.registry.business.bn',
'source': 'entity-bn.cb_subscription_handler',
'id': str(uuid.uuid4()),
'time': datetime.utcnow().isoformat(),
'datacontenttype': 'application/json',
'identifier': identifier,
'data': {}
}
subject = APP_CONFIG.SUBSCRIPTION_OPTIONS['subject']
await publish_event(payload, subject)
except Exception as err: # pylint: disable=broad-except; # noqa: B902
capture_message('Entity-bn queue publish identifier error: ' + identifier, level='error')
logger.error('Queue Publish queue publish identifier error: %s %s', identifier, err, exc_info=True)
await process_event(event_message, FLASK_APP)
except OperationalError as err:
logger.error('Queue Blocked - Database Issue: %s', json.dumps(event_message), exc_info=True)
raise err # We don't want to handle the error, as a DB down would drain the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""The Test Suites to ensure that the registration is operating correctly."""
import xml.etree.ElementTree as Et

from flask import current_app
import pytest
from legal_api.models import Business, RequestTracker

Expand Down Expand Up @@ -47,7 +48,12 @@ def side_effect(input_xml):
return 200, acknowledgement_response

mocker.patch('entity_bn.bn_processors.registration.request_bn_hub', side_effect=side_effect)
mocker.patch('entity_bn.bn_processors.registration.publish_event')

subjects_in_queue = {}
def publish_event(payload, subject):
subjects_in_queue[subject] = payload

mocker.patch('entity_bn.bn_processors.registration.publish_event', side_effect=publish_event)

business_number = '993775204'
business_program_id = 'BC'
Expand Down Expand Up @@ -87,6 +93,10 @@ def side_effect(input_xml):
business = Business.find_by_internal_id(business_id)
assert business.tax_id == f'{business_number}{business_program_id}{str(program_account_ref_no).zfill(4)}'

assert current_app.config['EMAIL_PUBLISH_OPTIONS']['subject'] in subjects_in_queue
assert current_app.config['SUBSCRIPTION_OPTIONS']['subject'] in subjects_in_queue


@pytest.mark.asyncio
@pytest.mark.parametrize('request_type', [
(RequestTracker.RequestType.INFORM_CRA),
Expand Down

0 comments on commit 32e8859

Please sign in to comment.