Skip to content

Commit

Permalink
Restore fk reference from notification to hook
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Jan 25, 2024
1 parent 602e16b commit 5452137
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 39 deletions.
6 changes: 4 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ LIMIT 1`, [projection.id]);

async #getNextNotification(tx, maxAttempts) {
const { rows } = await tx.query(`
SELECT n.id, n.hook_name, n.hook_event, n.attempts, n.projection_name, n.projection_version
FROM fby_get_next_notification($1) n`, [maxAttempts]);
SELECT n.id, h.name AS hook_name, h.event AS hook_event, n.attempts, n.projection_name, n.projection_version
FROM fby_get_next_notification($1) n
INNER JOIN fby_hook h ON h.id = n.hook_id
`, [maxAttempts]);
const notifications = rows.map(toNotification);
return notifications[0];
}
Expand Down
2 changes: 1 addition & 1 deletion lib/partials/add-projection.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ BEGIN
(v_projection_id, v_entity_id);
{{/dependencies}}

PERFORM fby_schedule_notification(h.name, 'ADD_PROJECTION', {{literal name}}, {{literal version}})
PERFORM fby_schedule_notification(h.id, {{literal name}}, {{literal version}})
FROM fby_hook h
WHERE h.event = 'ADD_PROJECTION';

Expand Down
10 changes: 5 additions & 5 deletions lib/partials/drop-projection.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ BEGIN
RAISE EXCEPTION 'Projection '{{literal name (v version)}}' does not exist';
END IF;

DELETE FROM fby_projection
WHERE name = {{literal name}}
AND version = {{literal version}};

DELETE FROM fby_notification
WHERE projection_name = {{literal name}}
AND projection_version = {{literal version}};

PERFORM fby_schedule_notification(h.name, 'DROP_PROJECTION', {{literal name}}, {{literal version}})
PERFORM fby_schedule_notification(h.id, {{literal name}}, {{literal version}})
FROM fby_hook h
WHERE h.event = 'DROP_PROJECTION';

DELETE FROM fby_projection
WHERE name = {{literal name}}
AND version = {{literal version}};

END;
$$ LANGUAGE plpgsql;
27 changes: 12 additions & 15 deletions migrations/005.create-fby-notification-mechanism.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,23 @@ CREATE TYPE fby_notification_status AS ENUM ('PENDING', 'OK');

CREATE TABLE fby_notification (
id SERIAL PRIMARY KEY,
hook_name TEXT NOT NULL,
hook_event fby_event_type NOT NULL,
hook_id INTEGER NOT NULL REFERENCES fby_hook (id) ON DELETE CASCADE,
projection_name TEXT NOT NULL,
projection_version INTEGER NOT NULL,
scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
attempts INTEGER DEFAULT 0,
status fby_notification_status NOT NULL DEFAULT 'PENDING',
last_attempted TIMESTAMP WITH TIME ZONE,
last_error TEXT,
CONSTRAINT uniq_fby_notification_hname_hevent_pname_pversion_status UNIQUE (hook_name, hook_event, projection_name, projection_version, status)
CONSTRAINT uniq_fby_notification_hid_pname_pversion_status UNIQUE (hook_id, projection_name, projection_version, status)
);

CREATE FUNCTION fby_schedule_notification(p_hook_name TEXT, p_hook_event fby_event_type, p_projection_name TEXT, p_projection_version INTEGER) RETURNS VOID
CREATE FUNCTION fby_schedule_notification(p_hook_id INTEGER, p_projection_name TEXT, p_projection_version INTEGER) RETURNS VOID
AS $$
BEGIN
INSERT INTO fby_notification(hook_name, hook_event, projection_name, projection_version) VALUES
(p_hook_name, p_hook_event, p_projection_name, p_projection_version)
ON CONFLICT (hook_name, hook_event, projection_name, projection_version, status) DO UPDATE SET
INSERT INTO fby_notification(hook_id, projection_name, projection_version) VALUES
(p_hook_id, p_projection_name, p_projection_version)
ON CONFLICT (hook_id, projection_name, projection_version, status) DO UPDATE SET
id = EXCLUDED.id,
scheduled_for = EXCLUDED.scheduled_for,
attempts = 0,
Expand All @@ -43,7 +42,7 @@ BEGIN
WHERE e.name = p_name AND e.version = p_version
)
LOOP
PERFORM fby_schedule_notification(h.name, 'ADD_CHANGE_SET', projection.name, projection.version)
PERFORM fby_schedule_notification(h.id, projection.name, projection.version)
FROM fby_hook h
WHERE h.event = 'ADD_CHANGE_SET'
AND (h.projection_id = projection.id OR h.projection_id IS NULL);
Expand All @@ -54,8 +53,7 @@ $$ LANGUAGE plpgsql;
CREATE FUNCTION fby_get_next_notification(p_max_attempts INTEGER)
RETURNS TABLE (
id INTEGER,
hook_name TEXT,
hook_event fby_event_type,
hook_id INTEGER,
attempts INTEGER,
projection_name TEXT,
projection_version INTEGER
Expand All @@ -65,8 +63,7 @@ BEGIN
RETURN QUERY
SELECT
n.id,
n.hook_name,
n.hook_event,
n.hook_id,
n.attempts + 1 AS attempts,
n.projection_name,
n.projection_version
Expand All @@ -83,10 +80,10 @@ $$ LANGUAGE plpgsql;
CREATE FUNCTION fby_pass_notification(p_id INTEGER) RETURNS VOID
AS $$
DECLARE
v_hook_name TEXT;
v_hook_id TEXT;
BEGIN
SELECT hook_name FROM fby_notification n WHERE n.id = p_id INTO v_hook_name;
DELETE FROM fby_notification n WHERE n.hook_name = v_hook_name AND n.status = 'OK';
SELECT hook_id FROM fby_notification n WHERE n.id = p_id INTO v_hook_id;
DELETE FROM fby_notification n WHERE n.hook_id = v_hook_id AND n.status = 'OK';
UPDATE fby_notification n
SET
attempts = n.attempts + 1,
Expand Down
12 changes: 6 additions & 6 deletions test/database-schema.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,28 @@ describe('Database Schema', () => {
});

describe('Notifications', () => {
it('should NOT cascade deletes from projection', async () => {
it('should cascade deletes from projection via hook', async () => {
await filby.withTransaction(async (tx) => {
await tx.query("INSERT INTO fby_projection (id, name, version) VALUES (1, 'Park', 1)");
await tx.query("INSERT INTO fby_hook (id, name, event, projection_id) VALUES (1, 'change', 'ADD_CHANGE_SET', 1)");
await tx.query("INSERT INTO fby_notification (id, hook_name, hook_event, projection_name, projection_version) VALUES (1, 'change', 'ADD_CHANGE_SET', 'Park', 1)");
await tx.query("INSERT INTO fby_notification (id, hook_id, projection_name, projection_version) VALUES (1, 1, 'Park', 1)");
await tx.query('DELETE FROM fby_projection');
});

const { rows: notifications } = await filby.withTransaction((tx) => tx.query('SELECT * from fby_notification'));
eq(notifications.length, 1);
eq(notifications.length, 0);
});

it('should NOT cascade deletes from hook', async () => {
it('should cascade deletes from hook', async () => {
await filby.withTransaction(async (tx) => {
await tx.query("INSERT INTO fby_projection (id, name, version) VALUES (1, 'Park', 1)");
await tx.query("INSERT INTO fby_hook (id, name, event, projection_id) VALUES (1, 'change', 'ADD_CHANGE_SET', 1)");
await tx.query("INSERT INTO fby_notification (id, hook_name, hook_event, projection_name, projection_version) VALUES (1, 'change', 'ADD_CHANGE_SET', 'Park', 1)");
await tx.query("INSERT INTO fby_notification (id, hook_id, projection_name, projection_version) VALUES (1, 1, 'Park', 1)");
await tx.query('DELETE FROM fby_hook');
});

const { rows: notifications } = await filby.withTransaction((tx) => tx.query('SELECT * from fby_notification'));
eq(notifications.length, 1);
eq(notifications.length, 0);
});
});

Expand Down
4 changes: 4 additions & 0 deletions test/dsl.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,10 @@ describe('DSL', () => {
await applyYaml(t.name, DROP_PROJECTION);

const countAfter = await countNotifications();
// -1 for deletion of the CHANGE_SET_PROJECTION notification via cascade
// -1 for deletion of the CHANGE_SET_GENERAL manual
// -1 for deletion of the ADD_PROJECTION manual
// +1 for the DROP_PROJECTION notification
eq(countAfter, 1);
});

Expand Down
20 changes: 10 additions & 10 deletions test/notifications.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ describe('Notifications', () => {
await tx.query(`INSERT INTO fby_hook (id, name, event, projection_id) VALUES
(1, 'VAT Rate Changed', 'ADD_CHANGE_SET', 1),
(2, 'CGT Rate Changed', 'ADD_CHANGE_SET', 2)`);
await tx.query(`INSERT INTO fby_notification (hook_name, hook_event, projection_name, projection_version) VALUES
('VAT Rate Changed', 'ADD_CHANGE_SET', 'VAT Rates', 1)`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_name, projection_version) VALUES
(1, 'VAT Rates', 1)`);
});

filby.subscribe('VAT Rate Changed', (notification) => {
Expand All @@ -73,8 +73,8 @@ describe('Notifications', () => {
await tx.query(`INSERT INTO fby_hook (id, name, event, projection_id) VALUES
(1, 'VAT Rate Changed', 'ADD_CHANGE_SET', 1),
(2, 'CGT Rate Changed', 'ADD_CHANGE_SET', 2)`);
await tx.query(`INSERT INTO fby_notification (hook_name, hook_event, projection_name, projection_version) VALUES
('VAT Rate Changed', 'ADD_CHANGE_SET', 'VAT Rates', 1)`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_name, projection_version) VALUES
(1, 'VAT Rates', 1)`);

});

Expand All @@ -95,8 +95,8 @@ describe('Notifications', () => {
await tx.query(`INSERT INTO fby_hook (id, name, event, projection_id) VALUES
(1, 'VAT Rate Changed', 'ADD_CHANGE_SET', 1),
(2, 'CGT Rate Changed', 'ADD_CHANGE_SET', 2)`);
await tx.query(`INSERT INTO fby_notification (hook_name, hook_event, projection_name, projection_version) VALUES
('VAT Rate Changed', 'ADD_CHANGE_SET', 'VAT Rates', 1)`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_name, projection_version) VALUES
(1, 'VAT Rates', 1)`);

});

Expand Down Expand Up @@ -124,8 +124,8 @@ describe('Notifications', () => {
await tx.query(`INSERT INTO fby_hook (id, name, event, projection_id) VALUES
(1, 'VAT Rate Changed', 'ADD_CHANGE_SET', 1),
(2, 'CGT Rate Changed', 'ADD_CHANGE_SET', 2)`);
await tx.query(`INSERT INTO fby_notification (hook_name, hook_event, projection_name, projection_version) VALUES
('VAT Rate Changed', 'ADD_CHANGE_SET', 'VAT Rates', 1)`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_name, projection_version) VALUES
(1, 'VAT Rates', 1)`);
});

let attempt = 0;
Expand Down Expand Up @@ -154,8 +154,8 @@ describe('Notifications', () => {
await tx.query(`INSERT INTO fby_hook (id, name, event, projection_id) VALUES
(1, 'VAT Rate Changed', 'ADD_CHANGE_SET', 1),
(2, 'CGT Rate Changed', 'ADD_CHANGE_SET', 2)`);
await tx.query(`INSERT INTO fby_notification (hook_name, hook_event, projection_name, projection_version) VALUES
('VAT Rate Changed', 'ADD_CHANGE_SET', 'VAT Rates', 1)`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_name, projection_version) VALUES
(1, 'VAT Rates', 1)`);

});

Expand Down

0 comments on commit 5452137

Please sign in to comment.