Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store only notification id in the payload for lockable notifications #76

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
46 changes: 43 additions & 3 deletions pgpubsub/listen.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def process_notifications(connection_wrapper):
with transaction.atomic():
for processor in [
NotificationProcessor,
FullPayloadLockableNotificationProcessor, # for backward compatibility
LockableNotificationProcessor,
NotificationRecoveryProcessor,
]:
Expand Down Expand Up @@ -177,10 +178,22 @@ def get_extra_filter() -> Q:
else:
return Q()

class LockableNotificationProcessor(NotificationProcessor):

def is_int(val: str) -> bool:
try:
int(val)
except ValueError:
return False
else:
return True


# This is kept for backward compatibility for the migration phase when old notifications
# are still around. This should be removed after the transition phase.
class FullPayloadLockableNotificationProcessor(NotificationProcessor):

def validate(self):
if self.notification.payload == '':
if self.notification.payload == '' or is_int(self.notification.payload):
raise InvalidNotificationProcessor

def process(self):
Expand All @@ -199,7 +212,7 @@ def process(self):
).first()
)
if notification is None:
logger.info(f'Could not obtain a lock on notification '
logger.info(f'Could not obtain a lock on notification pid='
f'{self.notification.pid}\n')
else:
logger.info(f'Obtained lock on {notification}')
Expand All @@ -208,6 +221,33 @@ def process(self):
self.notification.delete()


class LockableNotificationProcessor(NotificationProcessor):

def validate(self):
if self.notification.payload == '' or not is_int(self.notification.payload):
raise InvalidNotificationProcessor

def process(self):
logger.info(
f'Processing notification for {self.channel_cls.name()}')
notification_id = int(self.notification.payload)
notification = (
Notification.objects.select_for_update(
skip_locked=True).filter(
Q(id=notification_id) & get_extra_filter(),
channel=self.notification.channel,
).first()
)
if notification is None:
logger.info(f'Could not obtain a lock on notification id='
f'{notification_id}\n')
else:
logger.info(f'Obtained lock on {notification}')
self.notification = notification
self._execute()
self.notification.delete()


class NotificationRecoveryProcessor(LockableNotificationProcessor):

def validate(self):
Expand Down
9 changes: 5 additions & 4 deletions pgpubsub/tests/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ def another_author_trigger(

@pgpubsub.post_delete_listener(PostTriggerChannel)
def email_author(old: Post, new: Post):
author = Author.objects.get(pk=old.author_id)
print(f'Emailing {author.name} to inform then post '
f'{old.pk} has been deleted.')
email(author)
if old and old.author_id:
author = Author.objects.get(pk=old.author_id)
print(f'Emailing {author.name} to inform then post '
f'{old.pk} has been deleted.')
email(author)


def email(author: Author):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Generated by Django 3.2.12 on 2024-02-16 11:37

from django.db import migrations
import pgtrigger.compiler
import pgtrigger.migrations


class Migration(migrations.Migration):

dependencies = [
('tests', '0012_payload_context'),
]

operations = [
pgtrigger.migrations.RemoveTrigger(
model_name='author',
name='pgpubsub_160cf',
),
pgtrigger.migrations.RemoveTrigger(
model_name='child',
name='pgpubsub_89ef9',
),
pgtrigger.migrations.RemoveTrigger(
model_name='childofabstract',
name='pgpubsub_b1c0b',
),
pgtrigger.migrations.RemoveTrigger(
model_name='media',
name='pgpubsub_a83de',
),
pgtrigger.migrations.RemoveTrigger(
model_name='post',
name='pgpubsub_72091',
),
pgtrigger.migrations.AddTrigger(
model_name='author',
trigger=pgtrigger.compiler.Trigger(name='pgpubsub_160cf', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE notify_payload TEXT; payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Author"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_160cf\', payload)\n RETURNING id::text INTO notify_payload;\n \n perform pg_notify(\'pgpubsub_160cf\', notify_payload);\n RETURN NEW;\n ', hash='5059766458d9cc08bdc82542d6c73b99bc19c98b', operation='INSERT', pgid='pgtrigger_pgpubsub_160cf_72a36', table='tests_author', when='AFTER')),
),
pgtrigger.migrations.AddTrigger(
model_name='child',
trigger=pgtrigger.compiler.Trigger(name='pgpubsub_89ef9', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE notify_payload TEXT; payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Child"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_89ef9\', payload)\n RETURNING id::text INTO notify_payload;\n \n perform pg_notify(\'pgpubsub_89ef9\', notify_payload);\n RETURN NEW;\n ', hash='5ef4245b21803f43b5470ef807fc61a9beb8e6e3', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_89ef9_92bc1', table='tests_child', when='AFTER')),
),
pgtrigger.migrations.AddTrigger(
model_name='childofabstract',
trigger=pgtrigger.compiler.Trigger(name='pgpubsub_b1c0b', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE notify_payload TEXT; payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "ChildOfAbstract"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_b1c0b\', payload)\n RETURNING id::text INTO notify_payload;\n \n perform pg_notify(\'pgpubsub_b1c0b\', notify_payload);\n RETURN NEW;\n ', hash='70e23b76598e1697643f8a473d77ba6e04f4c20a', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_b1c0b_c8531', table='tests_childofabstract', when='AFTER')),
),
pgtrigger.migrations.AddTrigger(
model_name='media',
trigger=pgtrigger.compiler.Trigger(name='pgpubsub_a83de', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE notify_payload TEXT; payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Media"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_a83de\', payload)\n RETURNING id::text INTO notify_payload;\n \n perform pg_notify(\'pgpubsub_a83de\', notify_payload);\n RETURN NEW;\n ', hash='8b7e1505347519e2f80c35e9da6fa5da7c133ee2', operation='UPDATE OR INSERT', pgid='pgtrigger_pgpubsub_a83de_cacbb', table='tests_media', when='AFTER')),
),
pgtrigger.migrations.AddTrigger(
model_name='post',
trigger=pgtrigger.compiler.Trigger(name='pgpubsub_72091', sql=pgtrigger.compiler.UpsertTriggerSql(declare='DECLARE notify_payload TEXT; payload JSONB; notification_context_text TEXT;', func='\n \n payload := \'{"app": "tests", "model": "Post"}\'::jsonb;\n payload := jsonb_insert(payload, \'{old}\', COALESCE(to_jsonb(OLD), \'null\'));\n payload := jsonb_insert(payload, \'{new}\', COALESCE(to_jsonb(NEW), \'null\'));\n SELECT current_setting(\'pgpubsub.notification_context\', True) INTO notification_context_text;\n IF COALESCE(notification_context_text, \'\') = \'\' THEN\n notification_context_text := \'{}\';\n END IF;\n payload := jsonb_insert(payload, \'{context}\', notification_context_text::jsonb);\n \n \n INSERT INTO pgpubsub_notification (channel, payload)\n VALUES (\'pgpubsub_72091\', payload)\n RETURNING id::text INTO notify_payload;\n \n perform pg_notify(\'pgpubsub_72091\', notify_payload);\n RETURN NEW;\n ', hash='c6e87030e2953f454dadd7d4f0b2fcb9de3d9ccb', operation='DELETE', pgid='pgtrigger_pgpubsub_72091_67aeb', table='tests_post', when='AFTER')),
),
]
223 changes: 223 additions & 0 deletions pgpubsub/tests/test_notification_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import datetime
from decimal import Decimal
from typing import Callable
from unittest.mock import Mock

import pytest
from django.db import transaction

from pgpubsub.channel import registry
from pgpubsub.compatibility import Notify
from pgpubsub.listen import (
FullPayloadLockableNotificationProcessor,
InvalidNotificationProcessor,
LockableNotificationProcessor,
)
from pgpubsub.models import Notification
from pgpubsub.tests.channels import PostTriggerChannel
from pgpubsub.tests.models import Post


@pytest.fixture
def pg_connection() -> Mock:
return Mock()


def pg_notification(channel: str, payload: str) -> Notify:
return Notify(channel=channel, payload=payload, pid=1)


@pytest.fixture
def callback() -> Mock:
mock = Mock()
PostTriggerChannel.register(mock)
yield mock
del registry[PostTriggerChannel]


SOME_DATETIME = datetime.datetime.utcnow()

CHANNEL_NAME = PostTriggerChannel.listen_safe_name()


@pytest.mark.django_db(transaction=True)
def test_lockable_notification_processor_processes_legacy_insert_payload(
callback: Callable, pg_connection
):
post = Post(content='some-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.1"))
stored_payload = f"""
{{
"app": "tests",
"model": "Post",
"old": null,
"new": {{
"content": "{post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": {post.pk},
"rating": "1.1"
}}
}}
"""
notification = Notification.objects.create(
channel=CHANNEL_NAME, payload=stored_payload
)

sut = FullPayloadLockableNotificationProcessor(
pg_notification(channel=CHANNEL_NAME, payload=stored_payload), pg_connection
)

with transaction.atomic():
sut.process()

callback.assert_called_with(old=None, new=post)

@pytest.mark.django_db(transaction=True)
def test_lockable_notification_processor_processes_legacy_update_payload(
callback: Callable, pg_connection
):
old_post = Post(
content='some-old-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.2")
)
new_post = Post(
content='some-new-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.3")
)
stored_payload = f"""
{{
"app": "tests",
"model": "Post",
"old": {{
"content": "{old_post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.1"
}},
"new": {{
"content": "{new_post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.2"
}}
}}
"""
notification = Notification.objects.create(
channel=CHANNEL_NAME, payload=stored_payload
)

sut = FullPayloadLockableNotificationProcessor(
pg_notification(channel=CHANNEL_NAME, payload=stored_payload), pg_connection
)

with transaction.atomic():
sut.process()

callback.assert_called_with(old=old_post, new=new_post)


@pytest.mark.django_db(transaction=True)
def test_legacy_lockable_notification_processor_does_not_support_id_only_payloads(
callback: Callable, pg_connection
):
stored_payload = f"""
{{
"app": "tests",
"model": "Post",
"old": {{
"content": "old_content",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.1"
}},
"new": {{
"content": "new_content",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.2"
}}
}}
"""
notification = Notification.objects.create(
channel=CHANNEL_NAME, payload=stored_payload
)

with pytest.raises(InvalidNotificationProcessor):
FullPayloadLockableNotificationProcessor(
pg_notification(channel=CHANNEL_NAME, payload=str(notification.id)),
pg_connection,
)


@pytest.mark.django_db(transaction=True)
def test_lockable_notification_processor_processes_id_only_payload_for_insert(
callback: Callable, pg_connection
):
post = Post(content='some-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.2")
)
stored_payload = f"""
{{
"app": "tests",
"model": "Post",
"old": null,
"new": {{
"content": "{post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": {post.pk},
"rating": "1.1"
}}
}}
"""
notification = Notification.objects.create(
channel=CHANNEL_NAME, payload=stored_payload
)

sut = LockableNotificationProcessor(
pg_notification(channel=CHANNEL_NAME, payload=str(notification.id)),
pg_connection,
)

with transaction.atomic():
sut.process()

callback.assert_called_with(old=None, new=post)


@pytest.mark.django_db(transaction=True)
def test_lockable_notification_processor_processes_id_only_payload_for_update(
callback: Callable, pg_connection
):
old_post = Post(
content='some-old-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.2")
)
new_post = Post(
content='some-new-content', date=SOME_DATETIME, pk=1, rating=Decimal("1.3")
)
stored_payload = f"""
{{
"app": "tests",
"model": "Post",
"old": {{
"content": "{old_post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.1"
}},
"new": {{
"content": "{new_post.content}",
"date": "{SOME_DATETIME.isoformat()}",
"id": 1,
"rating": "1.2"
}}
}}
"""
notification = Notification.objects.create(
channel=CHANNEL_NAME, payload=stored_payload
)

sut = LockableNotificationProcessor(
pg_notification(channel=CHANNEL_NAME, payload=str(notification.id)),
pg_connection,
)

with transaction.atomic():
sut.process()

callback.assert_called_with(old=old_post, new=new_post)
Loading