Skip to content

Commit

Permalink
20732 - Queue fixes (bcgov#2828)
Browse files Browse the repository at this point in the history
* Debugging print

* more logging to figure out an issue

* more debug

* more debug

* Disable update env

* remove self._publisher

* Fix up the queue

* Change logging a bit, fix make files so it's rapid fire.

* Move logging down a tad

* Move logging.conf

* Add in error for unknown message type, add in enums for missing sections

* Put in detection against duplicate queue messages for account-mailer.

* Point at new auth-api

* lint fix

* lint fix

* lint fixes

* Add in unit test for duplicate message ids

* rename accident

* Fix enums in account-mailer so they match sbc-common

* use correct enum

* Update URL, other one causes an error

* Fix indent

* Add in duplicate message handling for auth-queue.

* fix up requirements

* Fix linting issues

* See if unit test passes again

* restore update-env for makefiles

* lint + test fix
  • Loading branch information
seeker25 authored May 21, 2024
1 parent 543c4f5 commit 7f88114
Show file tree
Hide file tree
Showing 24 changed files with 169 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Add in new table for account mailer for pubsub message processing.
Revision ID: b3a741249edc
Revises: e2d1d6417607
Create Date: 2024-05-15 14:52:45.780399
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'b3a741249edc'
down_revision = 'e2d1d6417607'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('pubsub_message_processing',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('cloud_event_id', sa.String(length=250), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('message_type', sa.String(length=250), nullable=False),
sa.Column('processed', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_pubsub_message_processing_id'), 'pubsub_message_processing', ['id'], unique=False)


def downgrade():
op.drop_index(op.f('ix_pubsub_message_processing_id'), table_name='pubsub_message_processing')
op.drop_table('pubsub_message_processing')
1 change: 1 addition & 0 deletions auth-api/src/auth_api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from .product_subscription import ProductSubscription
from .product_subscriptions_status import ProductSubscriptionsStatus
from .product_type_code import ProductTypeCode
from .pubsub_message_processing import PubSubMessageProcessing
from .suspension_reason_code import SuspensionReasonCode
from .task import Task
from .user import User
Expand Down
2 changes: 1 addition & 1 deletion auth-api/src/auth_api/models/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Membership(VersionedModel): # pylint: disable=too-few-public-methods # Te
user = relationship('User', foreign_keys=[user_id], lazy='select')
org = relationship('Org', foreign_keys=[org_id], lazy='select')

def __init__(self, **kwargs):
def __init__(self, **kwargs): # pylint: disable=super-init-not-called
"""Initialize a new membership."""
self.org_id = kwargs.get('org_id')
self.user_id = kwargs.get('user_id')
Expand Down
32 changes: 32 additions & 0 deletions auth-api/src/auth_api/models/pubsub_message_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""This model manages pubsub message processing.
NOTE: Only use this when it's not possible to use other indicators to track message processing.
Currently used by the account-mailer / auth-queue. This prevents duplicates.
"""
import datetime as dt
import pytz

from sqlalchemy import Column, DateTime, Integer, String
from .db import db


class PubSubMessageProcessing(db.Model):
"""PubSub Message Processing for cloud event messages."""

__tablename__ = 'pubsub_message_processing'

id = Column(Integer, index=True, primary_key=True)
cloud_event_id = Column(String(250), nullable=False)
created = Column(DateTime, default=dt.datetime.now(pytz.utc))
message_type = Column(String(250), nullable=False)
processed = Column(DateTime, nullable=True)

@classmethod
def find_by_id(cls, identifier):
"""Find a pubsub message processing by id."""
return cls.query.filter_by(id=identifier).one_or_none()

@classmethod
def find_by_cloud_event_id_and_type(cls, cloud_event_id, message_type):
"""Find a pubsub message processing for cloud event id and type."""
return cls.query.filter_by(cloud_event_id=cloud_event_id, message_type=message_type).one_or_none()
1 change: 1 addition & 0 deletions auth-api/src/auth_api/services/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def check_auth(**kwargs):
if product_code_in_jwt == 'ALL': # Product code for super admin service account (sbc-auth-admin)
return

auth = None
if business_identifier:
auth = Authorization.get_user_authorizations_for_entity(business_identifier)
elif org_identifier:
Expand Down
2 changes: 1 addition & 1 deletion auth-api/src/auth_api/services/gcp_queue/gcp_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def publisher(self):
"""Returns the publisher."""
if not self._publisher and self.credentials_pub:
self._publisher = pubsub_v1.PublisherClient(credentials=self.credentials_pub)
else:
if not self._publisher:
self._publisher = pubsub_v1.PublisherClient()
return self._publisher

Expand Down
6 changes: 3 additions & 3 deletions auth-api/src/auth_api/services/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def send_notification_to_member(self, origin_url, notification_type):
notification_type_for_mailer = ''
data = {}
if notification_type == NotificationType.ROLE_CHANGED.value:
notification_type_for_mailer = 'roleChangedNotification'
notification_type_for_mailer = QueueMessageTypes.ROLE_CHANGED_NOTIFICATION.value
data = {
'accountId': org_id,
'emailAddresses': recipient,
Expand All @@ -181,9 +181,9 @@ def send_notification_to_member(self, origin_url, notification_type):
# TODO how to check properly if user is bceid user
is_bceid_user = self._model.user.username.find('@bceid') > 0
if is_bceid_user:
notification_type_for_mailer = 'membershipApprovedNotificationForBceid'
notification_type_for_mailer = QueueMessageTypes.MEMBERSHIP_APPROVED_NOTIFICATION_FOR_BCEID.value
else:
notification_type_for_mailer = 'membershipApprovedNotification'
notification_type_for_mailer = QueueMessageTypes.MEMBERSHIP_APPROVED_NOTIFICATION.value

data = {
'accountId': org_id,
Expand Down
4 changes: 2 additions & 2 deletions auth-api/src/auth_api/services/org.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,9 +878,9 @@ def send_approved_rejected_notification(receipt_admin_emails, org_name, org_id,
current_app.logger.debug('<send_approved_rejected_notification')

if org_status == OrgStatus.ACTIVE.value:
notification_type = QueueMessageTypes.NON_BCSC_ORG_APPROVED.value
notification_type = QueueMessageTypes.NON_BCSC_ORG_APPROVED_NOTIFICATION.value
elif org_status == OrgStatus.REJECTED.value:
notification_type = QueueMessageTypes.NON_BCSC_ORG_REJECTED.value
notification_type = QueueMessageTypes.NON_BCSC_ORG_REJECTED_NOTIFICATION.value
else:
return # Don't send mail for any other status change
app_url = f"{origin_url}/{current_app.config.get('AUTH_WEB_TOKEN_CONFIRM_PATH')}"
Expand Down
1 change: 1 addition & 0 deletions auth-api/src/auth_api/services/validators/payment_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def validate(is_fatal=False, **kwargs) -> ValidatorResponse:
OrgType.SBC_STAFF: non_ejv_payment_methods,
OrgType.STAFF: non_ejv_payment_methods,
}
payment_type = None
if access_type == AccessType.GOVM.value:
payment_type = PaymentMethod.EJV.value
elif selected_payment_method:
Expand Down
2 changes: 1 addition & 1 deletion auth-api/src/auth_api/utils/custom_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sqlalchemy import String, func


class CustomQuery(BaseQuery):
class CustomQuery(BaseQuery): # pylint: disable=too-few-public-methods
"""Custom Query class to extend the base query class for helper functionality."""

def filter_conditionally(self, search_criteria, model_attribute, is_like: bool = False):
Expand Down
6 changes: 3 additions & 3 deletions auth-api/tests/unit/services/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ def test_check_auth_staff_path(session, monkeypatch, test_desc, test_expect, add
'test_desc,test_expect,additional_kwargs,is_org_member,is_entity_affiliated,product_code_in_jwt',
[
(
'Test UnboundLocalError when no role checks provided in kwargs, and no org_id or business_identifier.',
pytest.raises(UnboundLocalError), {}, False, False, ProductCode.BUSINESS.value
'Test 403 when no role checks provided in kwargs, and no org_id or business_identifier.',
pytest.raises(Forbidden), {}, False, False, ProductCode.BUSINESS.value
),
(
'Test OK when no role checks provided in kwargs, but has ALL product in jwt. (bypass all checks).',
Expand Down Expand Up @@ -359,7 +359,7 @@ def test_check_auth_system_path(session, monkeypatch, test_desc, test_expect, ad
'test_desc,test_expect,additional_kwargs,is_org_member,is_entity_affiliated',
[
(
'Test UnboundLocalError when no role checks provided in kwargs.',
'Test UnboundLocalError (403) when no role checks provided in kwargs.',
pytest.raises(UnboundLocalError), {}, False, False
),
(
Expand Down
2 changes: 1 addition & 1 deletion queue_services/account-mailer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ tornado==6.4
urllib3==1.26.18
zipp==3.18.1
-e git+https://github.com/bcgov/sbc-common-components.git#egg=sbc-common-components&subdirectory=python
-e git+https://github.com/seeker25/sbc-auth.git@queue_upgrades#egg=auth-api&subdirectory=auth-api
-e git+https://github.com/bcgov/sbc-auth.git#egg=auth-api&subdirectory=auth-api
git+https://github.com/daxiom/simple-cloudevent.py.git
4 changes: 4 additions & 0 deletions queue_services/account-mailer/src/account_mailer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
from auth_api.services.flags import flags
from auth_api.services.gcp_queue import queue
from auth_api.utils.cache import cache
from auth_api.utils.util_logging import setup_logging
from flask import Flask
from sentry_sdk.integrations.flask import FlaskIntegration

from account_mailer import config
from account_mailer.resources.worker import bp as worker_endpoint


setup_logging(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'logging.conf')) # important to do this first


def register_endpoints(app: Flask):
"""Register endpoints with the flask application."""
# Allow base route to match with, and without a trailing slash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def _get_admin_emails(username):
admin_emails = admin_user.contacts[0].contact.email
else:
admin_emails = admin_user.email
else:
raise ValueError('Admin user not found, cannot determine email address.')

return admin_emails, admin_name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### To access the new application and get started on your online filing:

* Go to: [https://www.bcregistry.ca/business>](https://www.bcregistry.ca/business)
* Go to: [https://www.account.bcregistry.gov.bc.ca/](https://www.account.bcregistry.gov.bc.ca/)
* If you have not yet accessed this website, and need to create a new account, select the "Create a BC Registries Account" button to start. Otherwise, please select the "Login" drop down in the upper right corner of the screen to login to your existing account.
* Once logged into your new account, you can add your business on the manage businesses dashboard. Please select the "+ Add an Existing..." button on the right, and select "Business" from the dropdown selection.
* Use your business incorporation number: {{ businessIdentifier }} and passcode, {{ passCode }} to link your business to your account.
Expand Down
14 changes: 7 additions & 7 deletions queue_services/account-mailer/src/account_mailer/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class SubjectType(Enum):
MEMBERSHIP_APPROVED_NOTIFICATION = '[BC Registries and Online Services] Welcome to the account {account_name}'
MEMBERSHIP_APPROVED_NOTIFICATION_FOR_BCEID = '[BC Registries and Online Services] Welcome to the account ' \
'{account_name}'
NONBCSC_ORG_APPROVED_NOTIFICATION = '[BC Registries and Online Services] APPROVED Business Registry Account'
NONBCSC_ORG_REJECTED_NOTIFICATION = '[BC Registries and Online Services] YOUR ACTION REQUIRED: ' \
'Business Registry Account cannot be approved'
NON_BCSC_ORG_APPROVED_NOTIFICATION = '[BC Registries and Online Services] APPROVED Business Registry Account'
NON_BCSC_ORG_REJECTED_NOTIFICATION = '[BC Registries and Online Services] YOUR ACTION REQUIRED: ' \
'Business Registry Account cannot be approved'
OTP_AUTHENTICATOR_RESET_NOTIFICATION = '[BC Registries and Online Services] Authenticator Has Been Reset'
ROLE_CHANGED_NOTIFICATION = '[BC Registries and Online Services] Your Role Has Been Changed'
STAFF_REVIEW_ACCOUNT = '[BC Registries and Online Services] An out of province account needs to be approved.'
Expand Down Expand Up @@ -86,8 +86,8 @@ class TitleType(Enum):
GOVM_MEMBER_INVITATION = 'Invitation to Join an Account at Business Registry'
MEMBERSHIP_APPROVED_NOTIFICATION = 'Your Membership Has Been Approved'
MEMBERSHIP_APPROVED_NOTIFICATION_FOR_BCEID = 'Your Membership Has Been Approved'
NONBCSC_ORG_APPROVED_NOTIFICATION = 'Your Membership Has Been Approved'
NONBCSC_ORG_REJECTED_NOTIFICATION = 'Your Membership Has Been Rejected'
NON_BCSC_ORG_APPROVED_NOTIFICATION = 'Your Membership Has Been Approved'
NON_BCSC_ORG_REJECTED_NOTIFICATION = 'Your Membership Has Been Rejected'
OTP_AUTHENTICATOR_RESET_NOTIFICATION = 'Your Authenticator Has Been Reset'
ROLE_CHANGED_NOTIFICATION = 'Your Role Has Been Changed'
STAFF_REVIEW_ACCOUNT = 'Notification from Business Registry'
Expand Down Expand Up @@ -130,8 +130,8 @@ class TemplateType(Enum):
GOVM_MEMBER_INVITATION_TEMPLATE_NAME = 'govm_member_invitation_email'
MEMBERSHIP_APPROVED_NOTIFICATION_TEMPLATE_NAME = 'membership_approved_notification_email'
MEMBERSHIP_APPROVED_NOTIFICATION_FOR_BCEID_TEMPLATE_NAME = 'membership_approved_notification_email_for_bceid'
NONBCSC_ORG_APPROVED_NOTIFICATION_TEMPLATE_NAME = 'nonbcsc_org_approved_notification_email'
NONBCSC_ORG_REJECTED_NOTIFICATION_TEMPLATE_NAME = 'nonbcsc_org_rejected_notification_email'
NON_BCSC_ORG_APPROVED_NOTIFICATION_TEMPLATE_NAME = 'nonbcsc_org_approved_notification_email'
NON_BCSC_ORG_REJECTED_NOTIFICATION_TEMPLATE_NAME = 'nonbcsc_org_rejected_notification_email'
OTP_AUTHENTICATOR_RESET_NOTIFICATION_TEMPLATE_NAME = 'otp_authenticator_reset_notification_email'
ROLE_CHANGED_NOTIFICATION_TEMPLATE_NAME = 'role_changed_notification_email'
STAFF_REVIEW_ACCOUNT_TEMPLATE_NAME = 'staff_review_account_email'
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
"""The unique worker functionality for this service is contained here."""
import dataclasses
import json
from datetime import datetime
from datetime import datetime, timezone
from http import HTTPStatus

from auth_api.models import db
from auth_api.models.pubsub_message_processing import PubSubMessageProcessing
from auth_api.services.gcp_queue import queue
from auth_api.services.gcp_queue.gcp_auth import ensure_authorized_queue_user
from auth_api.services.rest_service import RestService
Expand Down Expand Up @@ -44,7 +46,10 @@ def worker():
return {}, HTTPStatus.OK

try:
current_app.logger.info('Event Message Received: %s', json.dumps(dataclasses.asdict(event_message)))
current_app.logger.info('Event message received: %s', json.dumps(dataclasses.asdict(event_message)))
if is_message_processed(event_message):
current_app.logger.info('Event message already processed, skipping.')
return {}, HTTPStatus.OK
message_type, email_msg = event_message.type, event_message.data
email_msg['logo_url'] = minio_service.MinioService.get_minio_public_url('bc_logo_for_email.png')

Expand Down Expand Up @@ -72,6 +77,19 @@ def worker():
return {}, HTTPStatus.OK


def is_message_processed(event_message):
"""Check if the queue message is processed."""
if PubSubMessageProcessing.find_by_cloud_event_id_and_type(event_message.id, event_message.type):
return True
pubsub_message_processing = PubSubMessageProcessing()
pubsub_message_processing.cloud_event_id = event_message.id
pubsub_message_processing.message_type = event_message.type
pubsub_message_processing.processed = datetime.now(timezone.utc)
db.session.add(pubsub_message_processing)
db.session.commit()
return False


def handle_drawdown_request(message_type, email_msg):
"""Handle the drawdown request message."""
if message_type != QueueMessageTypes.REFUND_DRAWDOWN_REQUEST.value:
Expand Down Expand Up @@ -443,6 +461,7 @@ def handle_other_messages(message_type, email_msg):
)
template_name = TemplateType[f'{QueueMessageTypes(message_type).name}_TEMPLATE_NAME'].value
else:
current_app.logger.error('Unknown message type: %s', message_type)
return

kwargs = {
Expand Down
26 changes: 26 additions & 0 deletions queue_services/account-mailer/tests/unit/test_worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ def test_refund_request(app, session, client):
mock_send.assert_called


def test_duplicate_messages(app, session, client):
"""Assert that duplicate messages are handled by the queue.."""
user = factory_user_model_with_contact()
org = factory_org_model()
factory_membership_model(user.id, org.id)
id = org.id
mail_details = {
'accountId': id,
'accountName': org.name
}

with patch.object(notification_service, 'send_email', return_value=None) as mock_send:
helper_add_event_to_queue(client, message_type=QueueMessageTypes.NSF_LOCK_ACCOUNT.value,
mail_details=mail_details, message_id='f76e5ca9-93f3-44ee-a0f8-f47ee83b1971')
mock_send.assert_called
assert mock_send.call_args.args[0].get('recipients') == '[email protected]'
assert mock_send.call_args.args[0].get('content').get('subject') == SubjectType.NSF_LOCK_ACCOUNT_SUBJECT.value
assert mock_send.call_args.args[0].get('attachments') is None
assert True

with patch.object(notification_service, 'send_email', return_value=None) as mock_send:
helper_add_event_to_queue(client, message_type=QueueMessageTypes.NSF_LOCK_ACCOUNT.value,
mail_details=mail_details, message_id='f76e5ca9-93f3-44ee-a0f8-f47ee83b1971')
mock_send.assert_not_called


def test_lock_account_mailer_queue(app, session, client):
"""Assert that events can be retrieved and decoded from the Queue."""
user = factory_user_model_with_contact()
Expand Down
8 changes: 4 additions & 4 deletions queue_services/account-mailer/tests/unit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from simple_cloudevent import SimpleCloudEvent, to_queue_message


def build_request_for_queue_push(message_type, payload):
def build_request_for_queue_push(message_type, payload, message_id=None):
"""Build request for queue message."""
queue_message_bytes = to_queue_message(SimpleCloudEvent(
id=str(uuid.uuid4()),
id=str(message_id if message_id else uuid.uuid4()),
source='account-mailer',
subject=None,
time=datetime.now(tz=timezone.utc).isoformat(),
Expand All @@ -46,10 +46,10 @@ def post_to_queue(client, request_payload):
assert response.status_code == 200


def helper_add_event_to_queue(client, message_type: str, mail_details: dict):
def helper_add_event_to_queue(client, message_type: str, mail_details: dict, message_id=None):
"""Add event to the Queue."""
if not mail_details:
mail_details = {
}
payload = build_request_for_queue_push(message_type, mail_details)
payload = build_request_for_queue_push(message_type, mail_details, message_id)
post_to_queue(client, payload)
2 changes: 1 addition & 1 deletion queue_services/auth-queue/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ tornado==6.4
urllib3==1.26.18
zipp==3.18.1
-e git+https://github.com/bcgov/sbc-common-components.git#egg=sbc-common-components&subdirectory=python
-e git+https://github.com/seeker25/sbc-auth.git@queue_upgrades#egg=auth-api&subdirectory=auth-api
-e git+https://github.com/bcgov/sbc-auth.git#egg=auth-api&subdirectory=auth-api
git+https://github.com/daxiom/simple-cloudevent.py.git
4 changes: 4 additions & 0 deletions queue_services/auth-queue/src/auth_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
from auth_api.services.flags import flags
from auth_api.services.gcp_queue import queue
from auth_api.utils.cache import cache
from auth_api.utils.util_logging import setup_logging
from flask import Flask
from sentry_sdk.integrations.flask import FlaskIntegration

from auth_queue import config
from auth_queue.resources.worker import bp as worker_endpoint


setup_logging(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'logging.conf')) # important to do this first


def register_endpoints(app: Flask):
"""Register endpoints with the flask application."""
# Allow base route to match with, and without a trailing slash
Expand Down
File renamed without changes.
Loading

0 comments on commit 7f88114

Please sign in to comment.