Skip to content

Commit

Permalink
feat(eventBridge): moving corpus indexing to use event bridge types (#…
Browse files Browse the repository at this point in the history
…927)

* feat(eventBridge): moving corpus indexing to use event bridge types

* feat(eventBridge): removing empty objects

* feat(eventBridge): updating corpus parser hydration to use event package

* fix(nit): updating variable name
  • Loading branch information
bassrock authored Nov 9, 2024
1 parent cdfb7d4 commit 306f337
Show file tree
Hide file tree
Showing 23 changed files with 1,040 additions and 265 deletions.
1 change: 1 addition & 0 deletions lambdas/user-list-search-corpus-indexing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"dependencies": {
"@opensearch-project/opensearch": "^2.10.0",
"@pocket-tools/event-bridge": "workspace:*",
"@pocket-tools/ts-logger": "workspace:*",
"@sentry/aws-serverless": "8.37.1",
"fetch-retry": "^5.0.6",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CollectionPayload } from '../types';
import { CollectionPayload } from '@pocket-tools/event-bridge';
import { config } from '../config';
import { buildCollectionUrl } from '../utils';

Expand Down
32 changes: 17 additions & 15 deletions lambdas/user-list-search-corpus-indexing/src/index.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { config } from './config';
import * as oci from './queries/originalCorpusId';
import * as ci from './queries/collectionId';
import * as api from './commands/ApprovedItem';
import { PocketEventType } from '@pocket-tools/event-bridge';

/**
* Test cleanup: delete all documents in corpus indices
Expand Down Expand Up @@ -159,9 +160,9 @@ describe('bulk indexer', () => {
const payloads: EventPayload[] = [
{
messageId: '123abc',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://some-url.com',
approvedItemExternalId: 'aaaaa',
language: 'en',
Expand All @@ -184,19 +185,19 @@ describe('bulk indexer', () => {
const payloads: EventPayload[] = [
{
messageId: '123abc',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://some-url.com',
approvedItemExternalId: 'bbbbbbb',
language: 'en',
},
},
{
messageId: '456def',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://eine-url.de',
approvedItemExternalId: 'ccccccc',
language: 'de',
Expand All @@ -205,7 +206,7 @@ describe('bulk indexer', () => {
// Collection
{
messageId: '456def',
detailType: 'add-collection',
detailType: PocketEventType.COLLECTION_CREATED,
detail: {
collection: {
externalId: '999rsk',
Expand All @@ -215,6 +216,7 @@ describe('bulk indexer', () => {
language: 'de',
createdAt: 123456,
updatedAt: 123456,
collection_iab_child_category_id: '1',
authors: [
{
name: 'anonym anonym',
Expand Down Expand Up @@ -287,19 +289,19 @@ describe('bulk indexer', () => {
const payloads: EventPayload[] = [
{
messageId: '123abc',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://some-url.com',
approvedItemExternalId: 'dddddd',
language: 'not-language-code',
},
},
{
messageId: '456def',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://eine-url.de',
approvedItemExternalId: 'eeeee',
language: 'de',
Expand Down Expand Up @@ -444,21 +446,21 @@ describe('bulk indexer', () => {
const payloads: EventPayload[] = [
{
messageId: '123abc',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
// Doesn't matter, overwritten by mock
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://some-url.com',
approvedItemExternalId: 'aaaaa',
language: 'en',
},
},
{
messageId: 'xijk-kel',
detailType: 'add-approved-item',
detailType: PocketEventType.CORPUS_ITEM_ADDED,
detail: {
// Doesn't matter, overwritten by mock
eventType: 'add-approved-item',
eventType: PocketEventType.CORPUS_ITEM_ADDED,
url: 'http://some-url.com',
approvedItemExternalId: 'aaaaa',
language: 'en',
Expand Down
22 changes: 16 additions & 6 deletions lambdas/user-list-search-corpus-indexing/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import type {
SQSEvent,
} from 'aws-lambda';
import {
EventPayload,
ValidatedEventPayload,
validDetailTypes,
CollectionApprovedItemPayload,
SyndicatedItemPayload,
EventPayload,
} from './types';
import { upsertCollection } from './commands/Collection';
import { mergeCollection } from './commands/ApprovedItemCollection';
Expand All @@ -24,6 +24,10 @@ import { upsertApprovedItem } from './commands/ApprovedItem';
import { postRetry } from './postRetry';
import { serverLogger } from '@pocket-tools/ts-logger';
import { removeApprovedItem } from './commands/RemoveItem';
import {
PocketEventType,
sqsEventBridgeEvent,
} from '@pocket-tools/event-bridge';

/**
* The main handler function which will be wrapped by Sentry prior to export.
Expand All @@ -34,13 +38,19 @@ import { removeApprovedItem } from './commands/RemoveItem';
*/
export async function processor(event: SQSEvent): Promise<SQSBatchResponse> {
const validPayloads: Array<EventPayload> = event.Records.map((record) => {
const message = JSON.parse(JSON.parse(record.body).Message);
const pocketEvent = sqsEventBridgeEvent(record);
if (
pocketEvent == null ||
!validDetailTypes.includes(pocketEvent['detail-type'])
) {
return null;
}
return {
messageId: record.messageId,
detailType: message['detail-type'],
detail: message['detail'],
detailType: event['detail-type'],
detail: pocketEvent.detail,
};
}).filter((message) => validDetailTypes.includes(message['detailType']));
}).filter((message) => message != null);
const result = await bulkIndex(validPayloads);
return result;
}
Expand Down Expand Up @@ -79,7 +89,7 @@ export async function bulkIndex(
for await (const validItem of validItems) {
// Deleting
if (
validItem.detailType === 'remove-approved-item' &&
validItem.detailType === PocketEventType.CORPUS_ITEM_REMOVED &&
// Not possible, but just for typescript...
!('collection' in validItem.detail)
) {
Expand Down
142 changes: 24 additions & 118 deletions lambdas/user-list-search-corpus-indexing/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
import {
CorpusEvent,
CorpusItemPayload,
CollectionPayload,
CollectionEvent,
PocketEventType,
IncomingBaseEvent,
} from '@pocket-tools/event-bridge';

type BaseEventPayload = (CollectionEvent | CorpusEvent) & IncomingBaseEvent;

export type EventPayload = {
messageId: string;
detailType: string;
detail: ApprovedItemPayload | CollectionPayload;
detail: BaseEventPayload['detail'];
detailType: PocketEventType;
};

export type ValidatedEventPayload = Omit<EventPayload, 'detail'> & {
detail: ValidLanguageApprovedItemPayload | CollectionPayload;
};

export const validDetailTypes: Array<PocketEventType> = [
PocketEventType.CORPUS_ITEM_ADDED,
PocketEventType.CORPUS_ITEM_UPDATED,
PocketEventType.CORPUS_ITEM_REMOVED,
PocketEventType.COLLECTION_CREATED,
PocketEventType.COLLECTION_UPDATED,
];

// See indices .docker/aws-resources/elasticsearch
export type CorpusItemIndex = {
meta: { _id: string; _index: string };
Expand All @@ -34,125 +52,13 @@ export type CorpusItemIndex = {
}>;
};

// See infrastructure/pocket-event-bridge/src/event-rules/corpus-events/eventConfig.ts
// and infrastructure/pocket-event-bridge/src/event-rules/collection-events/eventConfig.ts
export const validDetailTypes = [
'add-approved-item',
'update-approved-item',
'collection-created',
'collection-updated',
'remove-approved-item',
];

type Author = { name: string; sortOrder: number };

// Types below are all copied from:
// https://github.com/Pocket/content-monorepo/blob/7342cb5468f11fc0b3ffdddf8693b6aeeb64f26e/servers/curated-corpus-api/src/events/types.ts#L95
export type ApprovedItemPayload = {
eventType: string;
approvedItemExternalId: string;
url: string;
authors?: Author[];
title?: string | null;
excerpt?: string | null;
language?: string | null;
publisher?: string | null;
imageUrl?: string | null;
topic?: string | null;
createdAt?: string | null; // UTC timestamp string
createdBy?: string | null; // UTC timestamp string
updatedAt?: string | null; // UTC timestamp string
datePublished?: string; // UTC timestamp string
isSyndicated?: boolean;
isCollection?: boolean;
domainName?: string;
isTimeSensitive?: boolean;
source?: string | null;
grade?: string | null;
};

export type ValidLanguageApprovedItemPayload = Omit<
ApprovedItemPayload,
CorpusItemPayload,
'language'
> & { language: string };

// servers/shared-snowplow-consumer/src/eventConsumer/collectionEvents/types.ts
export type CollectionPayload = {
collection: {
externalId: string;
slug: string;
title: string;
status: string;
language: string;
authors: CollectionAuthor[];
stories: CollectionStory[];
createdAt: number; // in seconds
updatedAt: number; // in seconds

imageUrl?: string;
labels?: Label[];
intro?: string;
curationCategory?: CurationCategory;
excerpt?: string;
partnership?: CollectionPartnership;
publishedAt?: number; // in seconds
IABParentCategory?: IABParentCategory;
IABChildCategory?: IABChildCategory;
};
};

export type CollectionStoryAuthor = { name: string; sort_order: number };

export type CurationCategory = {
collection_curation_category_id: string;
name: string;
slug: string;
};

export type CollectionPartnership = {
collection_partnership_id: string;
name: string;
blurb: string;
image_url: string;
type: string;
url: string;
};

export type CollectionAuthor = {
collection_author_id: string;
name: string;
active: boolean;
slug?: string;
bio?: string;
image_url?: string;
};

export type CollectionStory = {
collection_story_id: string;
url: string;
title: string;
excerpt: string;
image_url?: string;
publisher?: string;
authors: CollectionStoryAuthor[];
is_from_partner: boolean;
sort_order?: number;
};

export type IABParentCategory = {
collection_iab_parent_category_id: string;
name: string;
slug: string;
};

export type IABChildCategory = {
collection_iab_child_category_id: string;
name: string;
slug: string;
> & {
language: string;
};

export type Label = { collection_label_id: string; name: string };

// Corpus items which are not collections, to be indexed
export type CollectionApprovedItemPayload = Omit<
ValidLanguageApprovedItemPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"dependencies": {
"@aws-sdk/client-sagemaker-runtime": "3.679.0",
"@pocket-tools/event-bridge": "workspace:*",
"@pocket-tools/ts-logger": "workspace:*",
"@sentry/aws-serverless": "8.37.1",
"@smithy/node-http-handler": "3.2.0",
Expand All @@ -39,4 +40,4 @@
"tsconfig": "workspace:*",
"typescript": "5.6.3"
}
}
}
12 changes: 8 additions & 4 deletions lambdas/user-list-search-corpus-parser-hydration/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { parserRequest, parserResultToDoc } from './parserRequest';
import { bulkIndex } from './bulkIndex';
import { buildCollectionUrl, hasExcerptOrIsCollection } from './utils';
import { getEmbeddings } from './embeddingsRequest';
import { sqsEventBridgeEvent } from '@pocket-tools/event-bridge';

/**
* The main handler function which will be wrapped by Sentry prior to export.
Expand All @@ -29,15 +30,18 @@ export async function processor(event: SQSEvent): Promise<SQSBatchResponse> {

const validPayloads: Array<ValidLangEventPayload> = event.Records.map(
(record) => {
const message = JSON.parse(JSON.parse(record.body).Message);
const event = sqsEventBridgeEvent(record);
if (event == null || !validDetailTypes.includes(event['detail-type'])) {
return null;
}
return {
messageId: record.messageId,
detailType: message['detail-type'],
detail: message['detail'],
detailType: event['detail-type'],
detail: event['detail'],
};
},
)
.filter((message) => validDetailTypes.includes(message['detailType']))
.filter((message) => message != null)
.filter((message) => {
const language =
'collection' in message.detail
Expand Down
Loading

0 comments on commit 306f337

Please sign in to comment.