Skip to content

Commit

Permalink
feat(cdp): Added legacy plugins worker (#27835)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored and adamleithp committed Jan 29, 2025
1 parent 3f41640 commit dbe871d
Show file tree
Hide file tree
Showing 30 changed files with 1,450 additions and 49 deletions.
2 changes: 0 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "",
"CDP_CYCLOTRON_ENABLED_TEAMS": "*",
"PLUGIN_SERVER_MODE": "all-v2"
},
"presentation": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,20 @@ export function HogFunctionConfiguration({
return <NotFound object="Hog function" />
}

const isLegacyPlugin = hogFunction?.template?.id?.startsWith('plugin-')

const headerButtons = (
<>
{!templateId && (
<>
<More
overlay={
<>
<LemonButton fullWidth onClick={() => duplicate()}>
Duplicate
</LemonButton>
{!isLegacyPlugin && (
<LemonButton fullWidth onClick={() => duplicate()}>
Duplicate
</LemonButton>
)}
<LemonDivider />
<LemonButton status="danger" fullWidth onClick={() => deleteHogFunction()}>
Delete
Expand Down Expand Up @@ -174,11 +178,12 @@ export function HogFunctionConfiguration({
)
const canEditSource =
displayOptions.canEditSource ??
['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type)
(['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type) && !isLegacyPlugin)
const showPersonsCount = displayOptions.showPersonsCount ?? ['broadcast'].includes(type)
const showTesting =
displayOptions.showTesting ??
['destination', 'internal_destination', 'transformation', 'broadcast', 'email'].includes(type)
(['destination', 'internal_destination', 'transformation', 'broadcast', 'email'].includes(type) &&
!isLegacyPlugin)

return (
<div className="space-y-3">
Expand Down Expand Up @@ -259,7 +264,12 @@ export function HogFunctionConfiguration({
<LemonTextArea disabled={loading} />
</LemonField>

{hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
{isLegacyPlugin ? (
<LemonBanner type="warning">
This destination is one of our legacy plugins. It will be deprecated and you
should instead upgrade
</LemonBanner>
) : hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
<LemonDropdown
showArrow
overlay={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ export function HogFunctionFilters(): JSX.Element {
return <HogFunctionFiltersInternal />
}

const showMasking = type === 'destination'
const isLegacyPlugin = configuration?.template?.id?.startsWith('plugin-')

const showMasking = type === 'destination' && !isLegacyPlugin
const showDropEvents = type === 'transformation'

return (
Expand Down
6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpCyclotronWorker: true,
cdpCyclotronWorkerPlugins: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
Expand Down Expand Up @@ -139,6 +140,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpCyclotronWorker: true,
...sharedCapabilities,
}
case PluginServerMode.cdp_cyclotron_worker_plugins:
return {
cdpCyclotronWorkerPlugins: true,
...sharedCapabilities,
}
// This is only for functional tests, which time out if all capabilities are used
// ideally we'd run just the specific capability needed per test, but that's not easy to do atm
case PluginServerMode.functional_tests:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`CdpCyclotronWorkerPlugins onEvent should handle and collect errors 3`] = `
[
{
"key": "<REPLACED-UUID-0>",
"topic": "clickhouse_app_metrics2_test",
"value": {
"app_source": "hog_function",
"app_source_id": "<REPLACED-UUID-1>",
"count": 1,
"metric_kind": "failure",
"metric_name": "failed",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.000",
},
},
{
"key": "<REPLACED-UUID-1>",
"topic": "log_entries_test",
"value": {
"instance_id": "<REPLACED-UUID-2>",
"level": "debug",
"log_source": "hog_function",
"log_source_id": "<REPLACED-UUID-2>",
"message": "Executing plugin intercom",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.000",
},
},
{
"key": "<REPLACED-UUID-2>",
"topic": "log_entries_test",
"value": {
"instance_id": "<REPLACED-UUID-2>",
"level": "error",
"log_source": "hog_function",
"log_source_id": "<REPLACED-UUID-2>",
"message": "Plugin errored: Service is down, retry later",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.001",
},
},
]
`;

exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'customer-io', plugin: [Object] } 1`] = `
[
{
"level": "debug",
"message": "Executing plugin customer-io",
},
{
"level": "info",
"message": "Successfully authenticated with Customer.io. Completing setupPlugin.",
},
{
"level": "debug",
"message": "Detected email:, [email protected]",
},
{
"level": "debug",
"message": "{"status":{},"email":"test@posthog.com"}",
},
{
"level": "debug",
"message": "Should customer be tracked:, true",
},
{
"level": "debug",
"message": "Execution successful",
},
]
`;

exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'intercom', plugin: [Object] } 1`] = `
[
{
"level": "debug",
"message": "Executing plugin intercom",
},
{
"level": "info",
"message": "Contact [email protected] in Intercom not found",
},
{
"level": "debug",
"message": "Execution successful",
},
]
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { Response, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { PLUGINS_BY_ID } from '../legacy-plugins'
import { LegacyPluginLogger, LegacyPluginMeta } from '../legacy-plugins/types'
import { sanitizeLogMessage } from '../services/hog-executor.service'
import { HogFunctionInvocation, HogFunctionInvocationResult, HogFunctionTypeType } from '../types'
import { CdpCyclotronWorker } from './cdp-cyclotron-worker.consumer'

type PluginState = {
setupPromise: Promise<any>
errored: boolean
meta: LegacyPluginMeta
}

/**
* NOTE: This is a consumer to take care of legacy plugins.
*/
export class CdpCyclotronWorkerPlugins extends CdpCyclotronWorker {
protected name = 'CdpCyclotronWorkerPlugins'
protected queue = 'plugin' as const
protected hogTypes: HogFunctionTypeType[] = ['destination']

private pluginState: Record<string, PluginState> = {}

public async processInvocations(invocations: HogFunctionInvocation[]): Promise<HogFunctionInvocationResult[]> {
return await this.runManyWithHeartbeat(invocations, (item) => this.executePluginInvocation(item))
}

public async fetch(...args: Parameters<typeof trackedFetch>): Promise<Response> {
return trackedFetch(...args)
}

public async executePluginInvocation(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult> {
const result: HogFunctionInvocationResult = {
invocation,
finished: true,
capturedPostHogEvents: [],
logs: [],
}

const pluginId = invocation.hogFunction.template_id?.startsWith('plugin-')
? invocation.hogFunction.template_id.replace('plugin-', '')
: null

const isTestFunction = invocation.hogFunction.name.includes('[CDP-TEST-HIDDEN]')

result.logs.push({
level: 'debug',
timestamp: DateTime.now(),
message: `Executing plugin ${pluginId}`,
})
const plugin = pluginId ? PLUGINS_BY_ID[pluginId] : null

if (!plugin || !pluginId) {
result.error = new Error(`Plugin ${pluginId} not found`)
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin ${pluginId} not found`,
})
return result
}

const addLog = (level: 'debug' | 'warn' | 'error' | 'info', ...args: any[]) => {
result.logs.push({
level,
timestamp: DateTime.now(),
message: sanitizeLogMessage(args),
})
}

const logger: LegacyPluginLogger = {
debug: (...args: any[]) => addLog('debug', ...args),
warn: (...args: any[]) => addLog('warn', ...args),
log: (...args: any[]) => addLog('info', ...args),
error: (...args: any[]) => addLog('error', ...args),
}

let state = this.pluginState[pluginId]

const fetch = (...args: Parameters<typeof trackedFetch>): Promise<Response> => {
if (isTestFunction) {
addLog('info', 'Fetch called but mocked due to test function')
return Promise.resolve({
status: 500,
json: () =>
Promise.resolve({
message: 'Test function',
}),
} as Response)
}
return this.fetch(...args)
}

if (!state) {
// TODO: Modify fetch to be a silent log if it is a test function...
const meta: LegacyPluginMeta = {
config: invocation.globals.inputs,
global: {},
fetch,
logger: logger,
}

state = this.pluginState[pluginId] = {
setupPromise: plugin.setupPlugin?.(meta) ?? Promise.resolve(),
meta,
errored: false,
}
}

try {
await state.setupPromise
} catch (e) {
state.errored = true
result.error = e
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin ${pluginId} setup failed: ${e.message}`,
})
return result
}

// Convert the invocation into the right interface for the plugin

const event: ProcessedPluginEvent = {
distinct_id: invocation.globals.event.distinct_id,
ip: invocation.globals.event.properties.$ip,
team_id: invocation.hogFunction.team_id,
event: invocation.globals.event.event,
properties: invocation.globals.event.properties,
timestamp: invocation.globals.event.timestamp,
$set: invocation.globals.event.properties.$set,
$set_once: invocation.globals.event.properties.$set_once,
uuid: invocation.globals.event.uuid,
person: invocation.globals.person
? {
uuid: invocation.globals.person.id,
team_id: invocation.hogFunction.team_id,
properties: invocation.globals.person.properties,
created_at: '', // NOTE: We don't have this anymore - see if any plugin uses it...
}
: undefined,
}

try {
status.info('⚡️', 'Executing plugin', {
pluginId,
invocationId: invocation.id,
})
await plugin.onEvent?.(event, {
...state.meta,
// NOTE: We override logger and fetch here so we can track the calls
logger,
fetch,
})
result.logs.push({
level: 'debug',
timestamp: DateTime.now(),
message: `Execution successful`,
})
} catch (e) {
if (e instanceof RetryError) {
// NOTE: Schedule as a retry to cyclotron?
}

status.error('💩', 'Plugin errored', {
error: e,
pluginId,
invocationId: invocation.id,
})

result.error = e
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin errored: ${e.message}`,
})
}

return result
}
}
Loading

0 comments on commit dbe871d

Please sign in to comment.