Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdp): Added legacy plugins worker #27835

Merged
merged 48 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7e02fc8
added initial work
benjackwhite Jan 23, 2025
99e5b72
Fix up new plugins
benjackwhite Jan 23, 2025
52792bf
Fixes
benjackwhite Jan 23, 2025
dfb9e0d
Fixes
benjackwhite Jan 23, 2025
6c2575a
Fixes
benjackwhite Jan 23, 2025
f7db6e5
Fix
benjackwhite Jan 23, 2025
835d8fb
Fixes
benjackwhite Jan 23, 2025
dcc8b2f
Fixes
benjackwhite Jan 23, 2025
18ef79b
Fix
benjackwhite Jan 23, 2025
9317b33
Fixes
benjackwhite Jan 27, 2025
e47eb3c
Fix
benjackwhite Jan 27, 2025
365c1b8
Added check for metrics production
benjackwhite Jan 27, 2025
20a304c
fixed frontend
benjackwhite Jan 27, 2025
693bf59
Fixes
benjackwhite Jan 27, 2025
4e8267d
Fix up
benjackwhite Jan 27, 2025
8d2f00b
Fix
benjackwhite Jan 27, 2025
e3a1ca9
Update query snapshots
github-actions[bot] Jan 27, 2025
9694f07
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 27, 2025
e33dd64
Fix storage
benjackwhite Jan 27, 2025
b7781cf
Update query snapshots
github-actions[bot] Jan 27, 2025
9895d15
Merge branch 'feat/cyclotron-plugins' of github.com:PostHog/posthog i…
benjackwhite Jan 27, 2025
9001d92
Fixes
benjackwhite Jan 27, 2025
1bbda52
Fixes
benjackwhite Jan 27, 2025
d573b20
Fixes
benjackwhite Jan 27, 2025
d054739
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 27, 2025
2ab4312
USe own types
benjackwhite Jan 27, 2025
3231131
Fixes
benjackwhite Jan 27, 2025
343551c
fix
benjackwhite Jan 27, 2025
c574e35
Fixes
benjackwhite Jan 27, 2025
e8665e5
Added migration commands
benjackwhite Jan 27, 2025
04ba772
Fixes
benjackwhite Jan 27, 2025
99987b2
Fix
benjackwhite Jan 27, 2025
2ffb989
Fix
benjackwhite Jan 27, 2025
406d7de
Fix migration
benjackwhite Jan 28, 2025
0460d3a
Fixes
benjackwhite Jan 28, 2025
e245047
Fixes
benjackwhite Jan 28, 2025
4e7ffe0
Fix
benjackwhite Jan 28, 2025
3af5796
Fixes
benjackwhite Jan 28, 2025
91b8ea5
Fixes
benjackwhite Jan 28, 2025
be04729
fix
benjackwhite Jan 28, 2025
a2d4450
Fixes
benjackwhite Jan 28, 2025
b0ba188
Fixes
benjackwhite Jan 28, 2025
2de9d40
Fixes
benjackwhite Jan 28, 2025
cde1d59
Fixes
benjackwhite Jan 28, 2025
1e954b2
Fixes
benjackwhite Jan 28, 2025
da407fd
Revert
benjackwhite Jan 28, 2025
ab8757b
Fixes
benjackwhite Jan 28, 2025
8ebb8a9
Remove old secrets
benjackwhite Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "",
"CDP_CYCLOTRON_ENABLED_TEAMS": "*",
"PLUGIN_SERVER_MODE": "all-v2"
},
"presentation": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,20 @@ export function HogFunctionConfiguration({
return <NotFound object="Hog function" />
}

const isLegacyPlugin = hogFunction?.template?.id?.startsWith('plugin-')

const headerButtons = (
<>
{!templateId && (
<>
<More
overlay={
<>
<LemonButton fullWidth onClick={() => duplicate()}>
Duplicate
</LemonButton>
{!isLegacyPlugin && (
<LemonButton fullWidth onClick={() => duplicate()}>
Duplicate
</LemonButton>
)}
<LemonDivider />
<LemonButton status="danger" fullWidth onClick={() => deleteHogFunction()}>
Delete
Expand Down Expand Up @@ -174,11 +178,12 @@ export function HogFunctionConfiguration({
)
const canEditSource =
displayOptions.canEditSource ??
['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type)
(['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type) && !isLegacyPlugin)
const showPersonsCount = displayOptions.showPersonsCount ?? ['broadcast'].includes(type)
const showTesting =
displayOptions.showTesting ??
['destination', 'internal_destination', 'transformation', 'broadcast', 'email'].includes(type)
(['destination', 'internal_destination', 'transformation', 'broadcast', 'email'].includes(type) &&
!isLegacyPlugin)

return (
<div className="space-y-3">
Expand Down Expand Up @@ -259,7 +264,12 @@ export function HogFunctionConfiguration({
<LemonTextArea disabled={loading} />
</LemonField>

{hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
{isLegacyPlugin ? (
<LemonBanner type="warning">
This destination is one of our legacy plugins. It will be deprecated and you
should instead upgrade
</LemonBanner>
) : hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
<LemonDropdown
showArrow
overlay={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ export function HogFunctionFilters(): JSX.Element {
return <HogFunctionFiltersInternal />
}

const showMasking = type === 'destination'
const isLegacyPlugin = configuration?.template?.id?.startsWith('plugin-')

const showMasking = type === 'destination' && !isLegacyPlugin
const showDropEvents = type === 'transformation'

return (
Expand Down
6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpCyclotronWorker: true,
cdpCyclotronWorkerPlugins: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
Expand Down Expand Up @@ -139,6 +140,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpCyclotronWorker: true,
...sharedCapabilities,
}
case PluginServerMode.cdp_cyclotron_worker_plugins:
return {
cdpCyclotronWorkerPlugins: true,
...sharedCapabilities,
}
// This is only for functional tests, which time out if all capabilities are used
// ideally we'd run just the specific capability needed per test, but that's not easy to do atm
case PluginServerMode.functional_tests:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`CdpCyclotronWorkerPlugins onEvent should handle and collect errors 3`] = `
[
{
"key": "<REPLACED-UUID-0>",
"topic": "clickhouse_app_metrics2_test",
"value": {
"app_source": "hog_function",
"app_source_id": "<REPLACED-UUID-1>",
"count": 1,
"metric_kind": "failure",
"metric_name": "failed",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.000",
},
},
{
"key": "<REPLACED-UUID-1>",
"topic": "log_entries_test",
"value": {
"instance_id": "<REPLACED-UUID-2>",
"level": "debug",
"log_source": "hog_function",
"log_source_id": "<REPLACED-UUID-2>",
"message": "Executing plugin intercom",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.000",
},
},
{
"key": "<REPLACED-UUID-2>",
"topic": "log_entries_test",
"value": {
"instance_id": "<REPLACED-UUID-2>",
"level": "error",
"log_source": "hog_function",
"log_source_id": "<REPLACED-UUID-2>",
"message": "Plugin errored: Service is down, retry later",
"team_id": 2,
"timestamp": "2025-01-01 00:00:00.001",
},
},
]
`;

exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'customer-io', plugin: [Object] } 1`] = `
[
{
"level": "debug",
"message": "Executing plugin customer-io",
},
{
"level": "info",
"message": "Successfully authenticated with Customer.io. Completing setupPlugin.",
},
{
"level": "debug",
"message": "Detected email:, [email protected]",
},
{
"level": "debug",
"message": "{"status":{},"email":"[email protected]"}",
},
{
"level": "debug",
"message": "Should customer be tracked:, true",
},
{
"level": "debug",
"message": "Execution successful",
},
]
`;

exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'intercom', plugin: [Object] } 1`] = `
[
{
"level": "debug",
"message": "Executing plugin intercom",
},
{
"level": "info",
"message": "Contact [email protected] in Intercom not found",
},
{
"level": "debug",
"message": "Execution successful",
},
]
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { Response, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { PLUGINS_BY_ID } from '../legacy-plugins'
import { LegacyPluginLogger, LegacyPluginMeta } from '../legacy-plugins/types'
import { sanitizeLogMessage } from '../services/hog-executor.service'
import { HogFunctionInvocation, HogFunctionInvocationResult, HogFunctionTypeType } from '../types'
import { CdpCyclotronWorker } from './cdp-cyclotron-worker.consumer'

type PluginState = {
setupPromise: Promise<any>
errored: boolean
meta: LegacyPluginMeta
}

/**
* NOTE: This is a consumer to take care of legacy plugins.
*/
export class CdpCyclotronWorkerPlugins extends CdpCyclotronWorker {
protected name = 'CdpCyclotronWorkerPlugins'
protected queue = 'plugin' as const
protected hogTypes: HogFunctionTypeType[] = ['destination']

private pluginState: Record<string, PluginState> = {}

public async processInvocations(invocations: HogFunctionInvocation[]): Promise<HogFunctionInvocationResult[]> {
return await this.runManyWithHeartbeat(invocations, (item) => this.executePluginInvocation(item))
}

public async fetch(...args: Parameters<typeof trackedFetch>): Promise<Response> {
return trackedFetch(...args)
}

public async executePluginInvocation(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult> {
const result: HogFunctionInvocationResult = {
invocation,
finished: true,
capturedPostHogEvents: [],
logs: [],
}

const pluginId = invocation.hogFunction.template_id?.startsWith('plugin-')
? invocation.hogFunction.template_id.replace('plugin-', '')
: null

const isTestFunction = invocation.hogFunction.name.includes('[CDP-TEST-HIDDEN]')

result.logs.push({
level: 'debug',
timestamp: DateTime.now(),
message: `Executing plugin ${pluginId}`,
})
const plugin = pluginId ? PLUGINS_BY_ID[pluginId] : null

if (!plugin || !pluginId) {
result.error = new Error(`Plugin ${pluginId} not found`)
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin ${pluginId} not found`,
})
return result
}

const addLog = (level: 'debug' | 'warn' | 'error' | 'info', ...args: any[]) => {
result.logs.push({
level,
timestamp: DateTime.now(),
message: sanitizeLogMessage(args),
})
}

const logger: LegacyPluginLogger = {
debug: (...args: any[]) => addLog('debug', ...args),
warn: (...args: any[]) => addLog('warn', ...args),
log: (...args: any[]) => addLog('info', ...args),
error: (...args: any[]) => addLog('error', ...args),
}

let state = this.pluginState[pluginId]

const fetch = (...args: Parameters<typeof trackedFetch>): Promise<Response> => {
if (isTestFunction) {
addLog('info', 'Fetch called but mocked due to test function')
return Promise.resolve({
status: 500,
json: () =>
Promise.resolve({
message: 'Test function',
}),
} as Response)
}
return this.fetch(...args)
}

if (!state) {
// TODO: Modify fetch to be a silent log if it is a test function...
const meta: LegacyPluginMeta = {
config: invocation.globals.inputs,
global: {},
fetch,
logger: logger,
}

state = this.pluginState[pluginId] = {
setupPromise: plugin.setupPlugin?.(meta) ?? Promise.resolve(),
meta,
errored: false,
}
}

try {
await state.setupPromise
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
} catch (e) {
state.errored = true
result.error = e
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin ${pluginId} setup failed: ${e.message}`,
})
return result
}

// Convert the invocation into the right interface for the plugin

const event: ProcessedPluginEvent = {
distinct_id: invocation.globals.event.distinct_id,
ip: invocation.globals.event.properties.$ip,
team_id: invocation.hogFunction.team_id,
event: invocation.globals.event.event,
properties: invocation.globals.event.properties,
timestamp: invocation.globals.event.timestamp,
$set: invocation.globals.event.properties.$set,
$set_once: invocation.globals.event.properties.$set_once,
uuid: invocation.globals.event.uuid,
person: invocation.globals.person
? {
uuid: invocation.globals.person.id,
team_id: invocation.hogFunction.team_id,
properties: invocation.globals.person.properties,
created_at: '', // NOTE: We don't have this anymore - see if any plugin uses it...
}
: undefined,
}

try {
status.info('⚡️', 'Executing plugin', {
pluginId,
invocationId: invocation.id,
})
await plugin.onEvent?.(event, {
...state.meta,
// NOTE: We override logger and fetch here so we can track the calls
logger,
fetch,
})
result.logs.push({
level: 'debug',
timestamp: DateTime.now(),
message: `Execution successful`,
})
} catch (e) {
if (e instanceof RetryError) {
// NOTE: Schedule as a retry to cyclotron?
}

status.error('💩', 'Plugin errored', {
error: e,
pluginId,
invocationId: invocation.id,
})

result.error = e
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Plugin errored: ${e.message}`,
})
}

return result
}
}
Loading
Loading