@@ -259,7 +264,12 @@ export function HogFunctionConfiguration({
- {hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
+ {isLegacyPlugin ? (
+
+ This destination is one of our legacy plugins. It will be deprecated and you
+ should instead upgrade
+
+ ) : hogFunction?.template && !hogFunction.template.id.startsWith('template-blank-') ? (
}
- const showMasking = type === 'destination'
+ const isLegacyPlugin = configuration?.template?.id?.startsWith('plugin-')
+
+ const showMasking = type === 'destination' && !isLegacyPlugin
const showDropEvents = type === 'transformation'
return (
diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts
index bb7f34eb39163..55aa5ed25cc4c 100644
--- a/plugin-server/src/capabilities.ts
+++ b/plugin-server/src/capabilities.ts
@@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpCyclotronWorker: true,
+ cdpCyclotronWorkerPlugins: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
@@ -139,6 +140,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpCyclotronWorker: true,
...sharedCapabilities,
}
+ case PluginServerMode.cdp_cyclotron_worker_plugins:
+ return {
+ cdpCyclotronWorkerPlugins: true,
+ ...sharedCapabilities,
+ }
// This is only for functional tests, which time out if all capabilities are used
// ideally we'd run just the specific capability needed per test, but that's not easy to do atm
case PluginServerMode.functional_tests:
diff --git a/plugin-server/src/cdp/consumers/__snapshots__/cdp-cyclotron-plugins-worker.test.ts.snap b/plugin-server/src/cdp/consumers/__snapshots__/cdp-cyclotron-plugins-worker.test.ts.snap
new file mode 100644
index 0000000000000..1211a43ef7591
--- /dev/null
+++ b/plugin-server/src/cdp/consumers/__snapshots__/cdp-cyclotron-plugins-worker.test.ts.snap
@@ -0,0 +1,91 @@
+// Jest Snapshot v1, https://goo.gl/fbAQLP
+
+exports[`CdpCyclotronWorkerPlugins onEvent should handle and collect errors 3`] = `
+[
+ {
+ "key": "
",
+ "topic": "clickhouse_app_metrics2_test",
+ "value": {
+ "app_source": "hog_function",
+ "app_source_id": "",
+ "count": 1,
+ "metric_kind": "failure",
+ "metric_name": "failed",
+ "team_id": 2,
+ "timestamp": "2025-01-01 00:00:00.000",
+ },
+ },
+ {
+ "key": "",
+ "topic": "log_entries_test",
+ "value": {
+ "instance_id": "",
+ "level": "debug",
+ "log_source": "hog_function",
+ "log_source_id": "",
+ "message": "Executing plugin intercom",
+ "team_id": 2,
+ "timestamp": "2025-01-01 00:00:00.000",
+ },
+ },
+ {
+ "key": "",
+ "topic": "log_entries_test",
+ "value": {
+ "instance_id": "",
+ "level": "error",
+ "log_source": "hog_function",
+ "log_source_id": "",
+ "message": "Plugin errored: Service is down, retry later",
+ "team_id": 2,
+ "timestamp": "2025-01-01 00:00:00.001",
+ },
+ },
+]
+`;
+
+exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'customer-io', plugin: [Object] } 1`] = `
+[
+ {
+ "level": "debug",
+ "message": "Executing plugin customer-io",
+ },
+ {
+ "level": "info",
+ "message": "Successfully authenticated with Customer.io. Completing setupPlugin.",
+ },
+ {
+ "level": "debug",
+ "message": "Detected email:, test@posthog.com",
+ },
+ {
+ "level": "debug",
+ "message": "{"status":{},"email":"test@posthog.com"}",
+ },
+ {
+ "level": "debug",
+ "message": "Should customer be tracked:, true",
+ },
+ {
+ "level": "debug",
+ "message": "Execution successful",
+ },
+]
+`;
+
+exports[`CdpCyclotronWorkerPlugins smoke tests should run the plugin: { name: 'intercom', plugin: [Object] } 1`] = `
+[
+ {
+ "level": "debug",
+ "message": "Executing plugin intercom",
+ },
+ {
+ "level": "info",
+ "message": "Contact test@posthog.com in Intercom not found",
+ },
+ {
+ "level": "debug",
+ "message": "Execution successful",
+ },
+]
+`;
diff --git a/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.consumer.ts b/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.consumer.ts
new file mode 100644
index 0000000000000..877451bc4417a
--- /dev/null
+++ b/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.consumer.ts
@@ -0,0 +1,186 @@
+import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
+import { DateTime } from 'luxon'
+
+import { Response, trackedFetch } from '../../utils/fetch'
+import { status } from '../../utils/status'
+import { PLUGINS_BY_ID } from '../legacy-plugins'
+import { LegacyPluginLogger, LegacyPluginMeta } from '../legacy-plugins/types'
+import { sanitizeLogMessage } from '../services/hog-executor.service'
+import { HogFunctionInvocation, HogFunctionInvocationResult, HogFunctionTypeType } from '../types'
+import { CdpCyclotronWorker } from './cdp-cyclotron-worker.consumer'
+
+type PluginState = {
+ setupPromise: Promise
+ errored: boolean
+ meta: LegacyPluginMeta
+}
+
+/**
+ * NOTE: This is a consumer to take care of legacy plugins.
+ */
+export class CdpCyclotronWorkerPlugins extends CdpCyclotronWorker {
+ protected name = 'CdpCyclotronWorkerPlugins'
+ protected queue = 'plugin' as const
+ protected hogTypes: HogFunctionTypeType[] = ['destination']
+
+ private pluginState: Record = {}
+
+ public async processInvocations(invocations: HogFunctionInvocation[]): Promise {
+ return await this.runManyWithHeartbeat(invocations, (item) => this.executePluginInvocation(item))
+ }
+
+ public async fetch(...args: Parameters): Promise {
+ return trackedFetch(...args)
+ }
+
+ public async executePluginInvocation(invocation: HogFunctionInvocation): Promise {
+ const result: HogFunctionInvocationResult = {
+ invocation,
+ finished: true,
+ capturedPostHogEvents: [],
+ logs: [],
+ }
+
+ const pluginId = invocation.hogFunction.template_id?.startsWith('plugin-')
+ ? invocation.hogFunction.template_id.replace('plugin-', '')
+ : null
+
+ const isTestFunction = invocation.hogFunction.name.includes('[CDP-TEST-HIDDEN]')
+
+ result.logs.push({
+ level: 'debug',
+ timestamp: DateTime.now(),
+ message: `Executing plugin ${pluginId}`,
+ })
+ const plugin = pluginId ? PLUGINS_BY_ID[pluginId] : null
+
+ if (!plugin || !pluginId) {
+ result.error = new Error(`Plugin ${pluginId} not found`)
+ result.logs.push({
+ level: 'error',
+ timestamp: DateTime.now(),
+ message: `Plugin ${pluginId} not found`,
+ })
+ return result
+ }
+
+ const addLog = (level: 'debug' | 'warn' | 'error' | 'info', ...args: any[]) => {
+ result.logs.push({
+ level,
+ timestamp: DateTime.now(),
+ message: sanitizeLogMessage(args),
+ })
+ }
+
+ const logger: LegacyPluginLogger = {
+ debug: (...args: any[]) => addLog('debug', ...args),
+ warn: (...args: any[]) => addLog('warn', ...args),
+ log: (...args: any[]) => addLog('info', ...args),
+ error: (...args: any[]) => addLog('error', ...args),
+ }
+
+ let state = this.pluginState[pluginId]
+
+ const fetch = (...args: Parameters): Promise => {
+ if (isTestFunction) {
+ addLog('info', 'Fetch called but mocked due to test function')
+ return Promise.resolve({
+ status: 500,
+ json: () =>
+ Promise.resolve({
+ message: 'Test function',
+ }),
+ } as Response)
+ }
+ return this.fetch(...args)
+ }
+
+ if (!state) {
+ // TODO: Modify fetch to be a silent log if it is a test function...
+ const meta: LegacyPluginMeta = {
+ config: invocation.globals.inputs,
+ global: {},
+ fetch,
+ logger: logger,
+ }
+
+ state = this.pluginState[pluginId] = {
+ setupPromise: plugin.setupPlugin?.(meta) ?? Promise.resolve(),
+ meta,
+ errored: false,
+ }
+ }
+
+ try {
+ await state.setupPromise
+ } catch (e) {
+ state.errored = true
+ result.error = e
+ result.logs.push({
+ level: 'error',
+ timestamp: DateTime.now(),
+ message: `Plugin ${pluginId} setup failed: ${e.message}`,
+ })
+ return result
+ }
+
+ // Convert the invocation into the right interface for the plugin
+
+ const event: ProcessedPluginEvent = {
+ distinct_id: invocation.globals.event.distinct_id,
+ ip: invocation.globals.event.properties.$ip,
+ team_id: invocation.hogFunction.team_id,
+ event: invocation.globals.event.event,
+ properties: invocation.globals.event.properties,
+ timestamp: invocation.globals.event.timestamp,
+ $set: invocation.globals.event.properties.$set,
+ $set_once: invocation.globals.event.properties.$set_once,
+ uuid: invocation.globals.event.uuid,
+ person: invocation.globals.person
+ ? {
+ uuid: invocation.globals.person.id,
+ team_id: invocation.hogFunction.team_id,
+ properties: invocation.globals.person.properties,
+ created_at: '', // NOTE: We don't have this anymore - see if any plugin uses it...
+ }
+ : undefined,
+ }
+
+ try {
+ status.info('⚡️', 'Executing plugin', {
+ pluginId,
+ invocationId: invocation.id,
+ })
+ await plugin.onEvent?.(event, {
+ ...state.meta,
+ // NOTE: We override logger and fetch here so we can track the calls
+ logger,
+ fetch,
+ })
+ result.logs.push({
+ level: 'debug',
+ timestamp: DateTime.now(),
+ message: `Execution successful`,
+ })
+ } catch (e) {
+ if (e instanceof RetryError) {
+ // NOTE: Schedule as a retry to cyclotron?
+ }
+
+ status.error('💩', 'Plugin errored', {
+ error: e,
+ pluginId,
+ invocationId: invocation.id,
+ })
+
+ result.error = e
+ result.logs.push({
+ level: 'error',
+ timestamp: DateTime.now(),
+ message: `Plugin errored: ${e.message}`,
+ })
+ }
+
+ return result
+ }
+}
diff --git a/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.test.ts b/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.test.ts
new file mode 100644
index 0000000000000..8d501a550c182
--- /dev/null
+++ b/plugin-server/src/cdp/consumers/cdp-cyclotron-plugins-worker.test.ts
@@ -0,0 +1,323 @@
+import { DateTime } from 'luxon'
+
+import {
+ createHogExecutionGlobals,
+ createInvocation,
+ insertHogFunction as _insertHogFunction,
+} from '~/tests/cdp/fixtures'
+import { getProducedKafkaMessages, getProducedKafkaMessagesForTopic } from '~/tests/helpers/mocks/producer.mock'
+import { forSnapshot } from '~/tests/helpers/snapshots'
+import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'
+
+import { Hub, Team } from '../../types'
+import { closeHub, createHub } from '../../utils/db/hub'
+import { PLUGINS_BY_ID } from '../legacy-plugins'
+import { HogFunctionInvocationGlobalsWithInputs, HogFunctionType } from '../types'
+import { CdpCyclotronWorkerPlugins } from './cdp-cyclotron-plugins-worker.consumer'
+
+jest.setTimeout(1000)
+
+/**
+ * NOTE: The internal and normal events consumers are very similar so we can test them together
+ */
+describe('CdpCyclotronWorkerPlugins', () => {
+ let processor: CdpCyclotronWorkerPlugins
+ let hub: Hub
+ let team: Team
+ let fn: HogFunctionType
+ let globals: HogFunctionInvocationGlobalsWithInputs
+ let mockFetch: jest.Mock
+ const insertHogFunction = async (hogFunction: Partial) => {
+ const item = await _insertHogFunction(hub.postgres, team.id, {
+ ...hogFunction,
+ type: 'destination',
+ })
+ // Trigger the reload that django would do
+ await processor.hogFunctionManager.reloadAllHogFunctions()
+ return item
+ }
+
+ beforeEach(async () => {
+ await resetTestDatabase()
+ hub = await createHub()
+
+ team = await getFirstTeam(hub)
+ processor = new CdpCyclotronWorkerPlugins(hub)
+
+ await processor.start()
+
+ processor.fetch = mockFetch = jest.fn(() =>
+ Promise.resolve({
+ status: 200,
+ json: () =>
+ Promise.resolve({
+ status: 200,
+ }),
+ } as any)
+ )
+
+ jest.spyOn(processor['cyclotronWorker']!, 'updateJob').mockImplementation(() => {})
+ jest.spyOn(processor['cyclotronWorker']!, 'releaseJob').mockImplementation(() => Promise.resolve())
+
+ const fixedTime = DateTime.fromObject({ year: 2025, month: 1, day: 1 }, { zone: 'UTC' })
+ jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis())
+
+ fn = await insertHogFunction({
+ name: 'Plugin test',
+ template_id: 'plugin-intercom',
+ })
+ globals = {
+ ...createHogExecutionGlobals({
+ project: {
+ id: team.id,
+ } as any,
+ event: {
+ uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
+ event: '$pageview',
+ properties: {
+ $current_url: 'https://posthog.com',
+ $lib_version: '1.0.0',
+ $set: {
+ email: 'test@posthog.com',
+ },
+ },
+ timestamp: fixedTime.toISO(),
+ } as any,
+ }),
+ inputs: {
+ intercomApiKey: '1234567890',
+ triggeringEvents: '$identify,mycustomevent',
+ ignoredEmailDomains: 'dev.posthog.com',
+ useEuropeanDataStorage: 'No',
+ },
+ }
+ })
+
+ afterEach(async () => {
+ jest.setTimeout(10000)
+ await processor.stop()
+ await closeHub(hub)
+ })
+
+ afterAll(() => {
+ jest.useRealTimers()
+ })
+
+ describe('setupPlugin', () => {
+ it('should setup a plugin on first call', async () => {
+ jest.spyOn(PLUGINS_BY_ID['intercom'] as any, 'setupPlugin')
+
+ const results = processor.processBatch([
+ createInvocation(fn, globals),
+ createInvocation(fn, globals),
+ createInvocation(fn, globals),
+ ])
+
+ expect(await results).toMatchObject([{ finished: true }, { finished: true }, { finished: true }])
+
+ expect(PLUGINS_BY_ID['intercom'].setupPlugin).toHaveBeenCalledTimes(1)
+ expect(jest.mocked(PLUGINS_BY_ID['intercom'].setupPlugin!).mock.calls[0][0]).toMatchInlineSnapshot(`
+ {
+ "config": {
+ "ignoredEmailDomains": "dev.posthog.com",
+ "intercomApiKey": "1234567890",
+ "triggeringEvents": "$identify,mycustomevent",
+ "useEuropeanDataStorage": "No",
+ },
+ "fetch": [Function],
+ "global": {},
+ "logger": {
+ "debug": [Function],
+ "error": [Function],
+ "log": [Function],
+ "warn": [Function],
+ },
+ }
+ `)
+ })
+ })
+
+ describe('onEvent', () => {
+ it('should call the plugin onEvent method', async () => {
+ jest.spyOn(PLUGINS_BY_ID['intercom'] as any, 'onEvent')
+
+ const invocation = createInvocation(fn, globals)
+ invocation.globals.event.event = 'mycustomevent'
+ invocation.globals.event.properties = {
+ email: 'test@posthog.com',
+ }
+
+ mockFetch.mockResolvedValue({
+ status: 200,
+ json: () => Promise.resolve({ total_count: 1 }),
+ })
+
+ await processor.processBatch([invocation])
+
+ expect(PLUGINS_BY_ID['intercom'].onEvent).toHaveBeenCalledTimes(1)
+ expect(forSnapshot(jest.mocked(PLUGINS_BY_ID['intercom'].onEvent!).mock.calls[0][0]))
+ .toMatchInlineSnapshot(`
+ {
+ "distinct_id": "distinct_id",
+ "event": "mycustomevent",
+ "person": {
+ "created_at": "",
+ "properties": {
+ "email": "test@posthog.com",
+ "first_name": "Pumpkin",
+ },
+ "team_id": 2,
+ "uuid": "uuid",
+ },
+ "properties": {
+ "email": "test@posthog.com",
+ },
+ "team_id": 2,
+ "timestamp": "2025-01-01T00:00:00.000Z",
+ "uuid": "",
+ }
+ `)
+
+ expect(mockFetch).toHaveBeenCalledTimes(2)
+ expect(forSnapshot(mockFetch.mock.calls[0])).toMatchInlineSnapshot(`
+ [
+ "https://api.intercom.io/contacts/search",
+ {
+ "body": "{"query":{"field":"email","operator":"=","value":"test@posthog.com"}}",
+ "headers": {
+ "Accept": "application/json",
+ "Authorization": "Bearer 1234567890",
+ "Content-Type": "application/json",
+ },
+ "method": "POST",
+ },
+ ]
+ `)
+ expect(forSnapshot(mockFetch.mock.calls[1])).toMatchInlineSnapshot(`
+ [
+ "https://api.intercom.io/events",
+ {
+ "body": "{"event_name":"mycustomevent","created_at":null,"email":"test@posthog.com","id":"distinct_id"}",
+ "headers": {
+ "Accept": "application/json",
+ "Authorization": "Bearer 1234567890",
+ "Content-Type": "application/json",
+ },
+ "method": "POST",
+ },
+ ]
+ `)
+
+ expect(forSnapshot(jest.mocked(processor['cyclotronWorker']!.updateJob).mock.calls)).toMatchInlineSnapshot(`
+ [
+ [
+ "",
+ "completed",
+ ],
+ ]
+ `)
+ })
+
+ it('should mock out fetch if it is a test function', async () => {
+ jest.spyOn(PLUGINS_BY_ID['intercom'] as any, 'onEvent')
+
+ const invocation = createInvocation(fn, globals)
+ invocation.hogFunction.name = 'My function [CDP-TEST-HIDDEN]'
+ invocation.globals.event.event = 'mycustomevent'
+ invocation.globals.event.properties = {
+ email: 'test@posthog.com',
+ }
+
+ await processor.processBatch([invocation])
+
+ expect(mockFetch).toHaveBeenCalledTimes(0)
+
+ expect(PLUGINS_BY_ID['intercom'].onEvent).toHaveBeenCalledTimes(1)
+
+ expect(forSnapshot(getProducedKafkaMessagesForTopic('log_entries_test').map((m) => m.value.message)))
+ .toMatchInlineSnapshot(`
+ [
+ "Executing plugin intercom",
+ "Fetch called but mocked due to test function",
+ "Unable to search contact test@posthog.com in Intercom. Status Code: undefined. Error message: ",
+ "Execution successful",
+ ]
+ `)
+ })
+
+ it('should handle and collect errors', async () => {
+ jest.spyOn(PLUGINS_BY_ID['intercom'] as any, 'onEvent')
+
+ const invocation = createInvocation(fn, globals)
+ invocation.globals.event.event = 'mycustomevent'
+ invocation.globals.event.properties = {
+ email: 'test@posthog.com',
+ }
+
+ mockFetch.mockRejectedValue(new Error('Test error'))
+
+ const res = await processor.processBatch([invocation])
+
+ expect(PLUGINS_BY_ID['intercom'].onEvent).toHaveBeenCalledTimes(1)
+
+ expect(res[0].error).toBeInstanceOf(Error)
+ expect(forSnapshot(res[0].logs)).toMatchInlineSnapshot(`[]`)
+
+ expect(forSnapshot(jest.mocked(processor['cyclotronWorker']!.updateJob).mock.calls)).toMatchInlineSnapshot(`
+ [
+ [
+ "",
+ "failed",
+ ],
+ ]
+ `)
+
+ expect(forSnapshot(getProducedKafkaMessages())).toMatchSnapshot()
+ })
+ })
+
+ describe('smoke tests', () => {
+ const testCases = Object.entries(PLUGINS_BY_ID).map(([pluginId, plugin]) => ({
+ name: pluginId,
+ plugin,
+ }))
+
+ it.each(testCases)('should run the plugin: %s', async ({ name, plugin }) => {
+ globals.event.event = '$identify' // Many plugins filter for this
+ const invocation = createInvocation(fn, globals)
+
+ invocation.hogFunction.template_id = `plugin-${plugin.id}`
+
+ const inputs: Record = {}
+
+ for (const input of plugin.metadata.config) {
+ if (!input.key) {
+ continue
+ }
+
+ if (input.default) {
+ inputs[input.key] = input.default
+ continue
+ }
+
+ if (input.type === 'choice') {
+ inputs[input.key] = input.choices[0]
+ } else if (input.type === 'string') {
+ inputs[input.key] = 'test'
+ }
+ }
+
+ invocation.hogFunction.name = name
+ await processor.processBatch([invocation])
+
+ expect(
+ forSnapshot(
+ getProducedKafkaMessagesForTopic('log_entries_test').map((m) => ({
+ message: m.value.message,
+ level: m.value.level,
+ }))
+ )
+ ).toMatchSnapshot()
+ })
+ })
+})
diff --git a/plugin-server/src/cdp/consumers/cdp-cyclotron-worker.consumer.ts b/plugin-server/src/cdp/consumers/cdp-cyclotron-worker.consumer.ts
index 89e752d9cb7f3..a43930ec547f8 100644
--- a/plugin-server/src/cdp/consumers/cdp-cyclotron-worker.consumer.ts
+++ b/plugin-server/src/cdp/consumers/cdp-cyclotron-worker.consumer.ts
@@ -13,37 +13,31 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
protected name = 'CdpCyclotronWorker'
private cyclotronWorker?: CyclotronWorker
private runningWorker: Promise | undefined
- protected queue: 'hog' | 'fetch' = 'hog'
+ protected queue: 'hog' | 'fetch' | 'plugin' = 'hog'
protected hogTypes: HogFunctionTypeType[] = ['destination', 'internal_destination']
- public async processBatch(invocations: HogFunctionInvocation[]): Promise {
+ public async processInvocations(invocations: HogFunctionInvocation[]): Promise {
+ return await this.runManyWithHeartbeat(invocations, (item) => this.hogExecutor.execute(item))
+ }
+
+ public async processBatch(invocations: HogFunctionInvocation[]): Promise {
if (!invocations.length) {
- return
+ return []
}
const invocationResults = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeInvocations`,
- func: async () => {
- // NOTE: this service will never do fetching (unless we decide we want to do it in node at some point, its only used for e2e testing)
- // fetchExecutor would use rusty-hook to send a fetch request but thats no longer the case
- // we are currentyl going to execute the fetch locally for testing purposes
- // as nothing should ever land on the deprecated fetch queue this should be safe.
- const fetchQueue = invocations.filter((item) => item.queue === 'fetch')
- const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) =>
- this.fetchExecutor.execute(item)
- )
- const hogQueue = invocations.filter((item) => item.queue === 'hog')
- const hogResults = await this.runManyWithHeartbeat(hogQueue, (item) => this.hogExecutor.execute(item))
- return [...hogResults, ...(fetchResults.filter(Boolean) as HogFunctionInvocationResult[])]
- },
+ func: async () => await this.processInvocations(invocations),
})
await this.processInvocationResults(invocationResults)
await this.updateJobs(invocationResults)
await this.produceQueuedMessages()
+
+ return invocationResults
}
- private async updateJobs(invocations: HogFunctionInvocationResult[]) {
+ protected async updateJobs(invocations: HogFunctionInvocationResult[]) {
await Promise.all(
invocations.map((item) => {
if (item.invocation.queue === 'fetch') {
@@ -140,4 +134,11 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
export class CdpCyclotronWorkerFetch extends CdpCyclotronWorker {
protected name = 'CdpCyclotronWorkerFetch'
protected queue = 'fetch' as const
+
+ public async processInvocations(invocations: HogFunctionInvocation[]): Promise {
+ // NOTE: this service will never do fetching (unless we decide we want to do it in node at some point, its only used for e2e testing)
+ return (await this.runManyWithHeartbeat(invocations, (item) => this.fetchExecutor.execute(item))).filter(
+ Boolean
+ ) as HogFunctionInvocationResult[]
+ }
}
diff --git a/plugin-server/src/cdp/consumers/cdp-function-callback.consumer.ts b/plugin-server/src/cdp/consumers/cdp-function-callback.consumer.ts
deleted file mode 100644
index e69de29bb2d1d..0000000000000
diff --git a/plugin-server/src/cdp/consumers/cdp-processed-events.consumer.ts b/plugin-server/src/cdp/consumers/cdp-processed-events.consumer.ts
index 52b6e558c41e6..67b434c9979b2 100644
--- a/plugin-server/src/cdp/consumers/cdp-processed-events.consumer.ts
+++ b/plugin-server/src/cdp/consumers/cdp-processed-events.consumer.ts
@@ -41,7 +41,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
return {
teamId: item.globals.project.id,
functionId: item.hogFunction.id,
- queueName: 'hog',
+ queueName: item.hogFunction.template_id?.startsWith('plugin-') ? 'plugin' : 'hog',
priority: item.priority,
vmState: serializeHogFunctionInvocation(item),
}
diff --git a/plugin-server/src/cdp/legacy-plugins/customerio/index.ts b/plugin-server/src/cdp/legacy-plugins/customerio/index.ts
new file mode 100644
index 0000000000000..684deb0c22d5e
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/customerio/index.ts
@@ -0,0 +1,263 @@
+import { ProcessedPluginEvent } from '@posthog/plugin-scaffold'
+import { RetryError } from '@posthog/plugin-scaffold'
+
+import { Response } from '~/src/utils/fetch'
+
+import { LegacyPlugin, LegacyPluginMeta } from '../types'
+import metadata from './plugin.json'
+
+const DEFAULT_HOST = 'track.customer.io'
+const DEFAULT_SEND_EVENTS_FROM_ANONYMOUS_USERS = 'Send all events'
+
+type CustomerIoMeta = LegacyPluginMeta & {
+ config: {
+ customerioSiteId: string
+ customerioToken: string
+ host?: 'track.customer.io' | 'track-eu.customer.io'
+ identifyByEmail?: 'Yes' | 'No'
+ sendEventsFromAnonymousUsers?:
+ | 'Send all events'
+ | 'Only send events from users with emails'
+ | 'Only send events from users that have been identified'
+ eventsToSend?: string
+ }
+ global: {
+ authorizationHeader: string
+ eventNames: string[]
+ eventsConfig: EventsConfig
+ identifyByEmail: boolean
+ }
+}
+
+enum EventsConfig {
+ SEND_ALL = '1',
+ SEND_EMAILS = '2',
+ SEND_IDENTIFIED = '3',
+}
+
+const EVENTS_CONFIG_MAP = {
+ 'Send all events': EventsConfig.SEND_ALL,
+ 'Only send events from users with emails': EventsConfig.SEND_EMAILS,
+ 'Only send events from users that have been identified': EventsConfig.SEND_IDENTIFIED,
+}
+
+interface Customer {
+ status: Set<'seen' | 'identified' | 'with_email'>
+ email: string | null
+}
+
+async function callCustomerIoApi(
+ meta: CustomerIoMeta,
+ method: NonNullable,
+ host: string,
+ path: string,
+ authorization: string,
+ body?: any
+) {
+ const headers: Record = { 'User-Agent': 'PostHog Customer.io App', Authorization: authorization }
+ let bodySerialized: string | undefined
+ if (body != null) {
+ headers['Content-Type'] = 'application/json'
+ bodySerialized = JSON.stringify(body)
+ }
+ let response: Response
+ try {
+ response = await meta.fetch(`https://${host}${path}`, { method, headers, body: bodySerialized })
+ } catch (e) {
+ throw new RetryError(`Cannot reach the Customer.io API. ${e}`)
+ }
+ const responseStatusClass = Math.floor(response.status / 100)
+ if (response.status === 401 || response.status === 403) {
+ const responseData = await response.json()
+ throw new Error(
+ `Customer.io Site ID or API Key invalid! Response ${response.status}: ${JSON.stringify(responseData)}`
+ )
+ }
+ if (response.status === 408 || response.status === 429 || responseStatusClass === 5) {
+ const responseData = await response.json()
+ throw new RetryError(
+ `Received a potentially intermittent error from the Customer.io API. Response ${
+ response.status
+ }: ${JSON.stringify(responseData)}`
+ )
+ }
+ if (responseStatusClass !== 2) {
+ const responseData = await response.json()
+ throw new Error(
+ `Received an unexpected error from the Customer.io API. Response ${response.status}: ${JSON.stringify(
+ responseData
+ )}`
+ )
+ }
+ return response
+}
+
+export const setupPlugin = async (meta: CustomerIoMeta) => {
+ const { config, global, logger } = meta
+ const customerioBase64AuthToken = Buffer.from(`${config.customerioSiteId}:${config.customerioToken}`).toString(
+ 'base64'
+ )
+ global.authorizationHeader = `Basic ${customerioBase64AuthToken}`
+ global.eventNames = config.eventsToSend
+ ? (config.eventsToSend as string)
+ .split(',')
+ .map((name) => name.trim())
+ .filter(Boolean)
+ : []
+ global.eventsConfig =
+ EVENTS_CONFIG_MAP[config.sendEventsFromAnonymousUsers || DEFAULT_SEND_EVENTS_FROM_ANONYMOUS_USERS]
+ global.identifyByEmail = config.identifyByEmail === 'Yes'
+
+ // See https://www.customer.io/docs/api/#operation/getCioAllowlist
+ await callCustomerIoApi(meta, 'GET', 'api.customer.io', '/v1/api/info/ip_addresses', global.authorizationHeader)
+ logger.log('Successfully authenticated with Customer.io. Completing setupPlugin.')
+}
+
+export const onEvent = async (event: ProcessedPluginEvent, meta: CustomerIoMeta) => {
+ const { global, config, logger } = meta
+ // KLUDGE: This shouldn't even run if setupPlugin failed. Needs to be fixed at the plugin server level
+ if (!global.eventNames) {
+ throw new RetryError('Cannot run exportEvents because setupPlugin failed!')
+ }
+
+ if (global.eventNames.length !== 0 && !global.eventNames.includes(event.event)) {
+ return
+ }
+ if (event.event === '$create_alias') {
+ return
+ }
+
+ const customer: Customer = syncCustomerMetadata(meta, event)
+ logger.debug(customer)
+ logger.debug('Should customer be tracked:', shouldCustomerBeTracked(customer, global.eventsConfig))
+ if (!shouldCustomerBeTracked(customer, global.eventsConfig)) {
+ return
+ }
+
+ await exportSingleEvent(
+ meta,
+ event,
+ customer,
+ global.authorizationHeader,
+ config.host || DEFAULT_HOST,
+ global.identifyByEmail
+ )
+}
+
+function syncCustomerMetadata(meta: CustomerIoMeta, event: ProcessedPluginEvent): Customer {
+ const { logger } = meta
+ const email = getEmailFromEvent(event)
+ const customerStatus = new Set() as Customer['status']
+ logger.debug('Detected email:', email)
+
+ // Update customer status
+ customerStatus.add('seen')
+ if (event.event === '$identify') {
+ customerStatus.add('identified')
+ }
+ if (email) {
+ customerStatus.add('with_email')
+ }
+
+ return {
+ status: customerStatus,
+ email,
+ }
+}
+
+function shouldCustomerBeTracked(customer: Customer, eventsConfig: EventsConfig): boolean {
+ switch (eventsConfig) {
+ case EventsConfig.SEND_ALL:
+ return true
+ case EventsConfig.SEND_EMAILS:
+ return customer.status.has('with_email')
+ case EventsConfig.SEND_IDENTIFIED:
+ return customer.status.has('identified')
+ default:
+ throw new Error(`Unknown eventsConfig: ${eventsConfig}`)
+ }
+}
+
+async function exportSingleEvent(
+ meta: CustomerIoMeta,
+ event: ProcessedPluginEvent,
+ customer: Customer,
+ authorizationHeader: string,
+ host: string,
+ identifyByEmail: boolean
+) {
+ // Clean up properties
+ if (event.properties) {
+ delete event.properties['$set']
+ delete event.properties['$set_once']
+ }
+
+ const customerPayload: Record = {
+ ...(event.$set || {}),
+ identifier: event.distinct_id,
+ }
+
+ if ('created_at' in customerPayload) {
+ // Timestamp must be in seconds since UNIX epoch.
+ // See: https://customer.io/docs/journeys/faq-timestamps/.
+ customerPayload.created_at = Date.parse(customerPayload.created_at) / 1000
+ }
+
+ let id = event.distinct_id
+
+ if (customer.email) {
+ customerPayload.email = customer.email
+ if (identifyByEmail) {
+ id = customer.email
+ }
+ }
+ // Create or update customer
+ // See https://www.customer.io/docs/api/#operation/identify
+ await callCustomerIoApi(meta, 'PUT', host, `/api/v1/customers/${id}`, authorizationHeader, customerPayload)
+
+ const eventType = event.event === '$pageview' ? 'page' : event.event === '$screen' ? 'screen' : 'event'
+ const eventTimestamp = (event.timestamp ? new Date(event.timestamp).valueOf() : Date.now()) / 1000
+ // Track event
+ // See https://www.customer.io/docs/api/#operation/track
+ await callCustomerIoApi(meta, 'POST', host, `/api/v1/customers/${id}/events`, authorizationHeader, {
+ name: event.event,
+ type: eventType,
+ timestamp: eventTimestamp,
+ data: event.properties || {},
+ })
+}
+
+function isEmail(email: string): boolean {
+ if (typeof email !== 'string') {
+ return false
+ }
+ const re =
+ /^(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
+ return re.test(email.toLowerCase())
+}
+
+function getEmailFromEvent(event: ProcessedPluginEvent): string | null {
+ if (event.person?.properties?.email) {
+ return event.person.properties.email
+ }
+ const setAttribute = event.$set
+ if (typeof setAttribute !== 'object' || !setAttribute['email']) {
+ return null
+ }
+ const emailCandidate = setAttribute['email']
+ if (isEmail(emailCandidate)) {
+ return emailCandidate
+ }
+ // Use distinct ID as a last resort
+ if (isEmail(event.distinct_id)) {
+ return event.distinct_id
+ }
+ return null
+}
+
+export const customerioPlugin: LegacyPlugin = {
+ id: 'customer-io',
+ metadata: metadata as any,
+ setupPlugin: setupPlugin as any,
+ onEvent,
+}
diff --git a/plugin-server/src/cdp/legacy-plugins/customerio/plugin.json b/plugin-server/src/cdp/legacy-plugins/customerio/plugin.json
new file mode 100644
index 0000000000000..d59671c3d3ab1
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/customerio/plugin.json
@@ -0,0 +1,60 @@
+{
+ "name": "Customer.io",
+ "description": "Send event data and emails into Customer.io.",
+ "posthogVersion": ">= 1.25.0",
+ "main": "index.ts",
+ "config": [
+ {
+ "key": "customerioSiteId",
+ "hint": "Provided during Customer.io setup.",
+ "name": "Customer.io Site ID",
+ "type": "string",
+ "default": "",
+ "required": true,
+ "secret": true
+ },
+ {
+ "key": "customerioToken",
+ "hint": "Provided during Customer.io setup.",
+ "name": "Customer.io API Key",
+ "type": "string",
+ "default": "",
+ "required": true,
+ "secret": true
+ },
+ {
+ "key": "host",
+ "name": "Tracking Endpoint",
+ "hint": "Use the EU variant if your Customer.io account is based in the EU region.",
+ "type": "choice",
+ "default": "track.customer.io",
+ "choices": ["track.customer.io", "track-eu.customer.io"]
+ },
+ {
+ "key": "identifyByEmail",
+ "name": "Identify by email",
+ "hint": "If enabled, the plugin will identify users by email instead of ID, whenever an email is available.",
+ "type": "choice",
+ "default": "No",
+ "choices": ["Yes", "No"]
+ },
+ {
+ "key": "sendEventsFromAnonymousUsers",
+ "name": "Filtering of Anonymous Users",
+ "type": "choice",
+ "hint": "Customer.io pricing is based on the number of customers. This is an option to only send events from users that have been identified. Take into consideration that merging after identification won't work (as those previously anonymous events won't be there).",
+ "default": "Send all events",
+ "choices": [
+ "Send all events",
+ "Only send events from users that have been identified",
+ "Only send events from users with emails"
+ ]
+ },
+ {
+ "key": "eventsToSend",
+ "name": "PostHog Event Allowlist",
+ "type": "string",
+ "hint": "If this is set, only the specified events (comma-separated) will be sent to Customer.io."
+ }
+ ]
+}
diff --git a/plugin-server/src/cdp/legacy-plugins/index.ts b/plugin-server/src/cdp/legacy-plugins/index.ts
new file mode 100644
index 0000000000000..f15927da6081b
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/index.ts
@@ -0,0 +1,7 @@
+import { customerioPlugin } from './customerio'
+import { intercomPlugin } from './intercom'
+
+export const PLUGINS_BY_ID = {
+ [customerioPlugin.id]: customerioPlugin,
+ [intercomPlugin.id]: intercomPlugin,
+}
diff --git a/plugin-server/src/cdp/legacy-plugins/intercom/index.ts b/plugin-server/src/cdp/legacy-plugins/intercom/index.ts
new file mode 100644
index 0000000000000..c2eb3b5319841
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/intercom/index.ts
@@ -0,0 +1,197 @@
+import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
+
+import { Response } from '~/src/utils/fetch'
+
+import { LegacyPlugin, LegacyPluginMeta } from '../types'
+import metadata from './plugin.json'
+
+type IntercomMeta = LegacyPluginMeta & {
+ global: {
+ intercomUrl: string
+ }
+ config: {
+ intercomApiKey: string
+ triggeringEvents: string
+ ignoredEmailDomains: string
+ useEuropeanDataStorage: string
+ }
+}
+
+async function onEvent(event: ProcessedPluginEvent, meta: IntercomMeta): Promise {
+ if (!isTriggeringEvent(meta.config.triggeringEvents, event.event)) {
+ return
+ }
+
+ const intercomUrl =
+ meta.config.useEuropeanDataStorage === 'Yes' ? 'https://api.eu.intercom.com' : 'https://api.intercom.io'
+
+ const email = getEmailFromEvent(event)
+ if (!email) {
+ meta.logger.warn(
+ `'${event.event}' will not be sent to Intercom because distinct_id is not an email and no 'email' was found in the event properties.`
+ )
+ meta.logger.debug(`Skipped event with UUID ${event.uuid}`)
+ return
+ }
+
+ if (isIgnoredEmailDomain(meta.config.ignoredEmailDomains, email)) {
+ return
+ }
+
+ const timestamp = getTimestamp(meta, event)
+
+ const isContactInIntercom = await searchForContactInIntercom(meta, intercomUrl, meta.config.intercomApiKey, email)
+ if (!isContactInIntercom) {
+ return
+ }
+ await sendEventToIntercom(
+ meta,
+ intercomUrl,
+ meta.config.intercomApiKey,
+ email,
+ event.event,
+ event['distinct_id'],
+ timestamp
+ )
+}
+
+async function searchForContactInIntercom(meta: IntercomMeta, url: string, apiKey: string, email: string) {
+ const searchContactResponse = await fetchWithRetry(
+ meta,
+ `${url}/contacts/search`,
+ {
+ headers: {
+ Accept: 'application/json',
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${apiKey}`,
+ },
+ body: JSON.stringify({
+ query: {
+ field: 'email',
+ operator: '=',
+ value: email,
+ },
+ }),
+ },
+ 'POST'
+ )
+ const searchContactResponseJson = (await searchContactResponse.json()) as Record
+
+ if (!statusOk(searchContactResponse) || searchContactResponseJson.errors) {
+ const errorMessage = searchContactResponseJson.errors ? searchContactResponseJson.errors[0].message : ''
+ meta.logger.error(
+ `Unable to search contact ${email} in Intercom. Status Code: ${searchContactResponseJson.status}. Error message: ${errorMessage}`
+ )
+ return false
+ } else {
+ const found = searchContactResponseJson['total_count'] > 0
+ meta.logger.log(`Contact ${email} in Intercom ${found ? 'found' : 'not found'}`)
+ return found
+ }
+}
+
+async function sendEventToIntercom(
+ meta: IntercomMeta,
+ url: string,
+ apiKey: string,
+ email: string,
+ event: string,
+ distinct_id: string,
+ eventSendTime: number
+) {
+ const sendEventResponse = await fetchWithRetry(
+ meta,
+ `${url}/events`,
+ {
+ headers: {
+ Accept: 'application/json',
+ 'Content-Type': 'application/json',
+ Authorization: `Bearer ${apiKey}`,
+ },
+ body: JSON.stringify({
+ event_name: event,
+ created_at: eventSendTime,
+ email,
+ id: distinct_id,
+ }),
+ },
+ 'POST'
+ )
+
+ if (!statusOk(sendEventResponse)) {
+ let errorMessage = ''
+ try {
+ const sendEventResponseJson = await sendEventResponse.json()
+ errorMessage = sendEventResponseJson.errors ? sendEventResponseJson.errors[0].message : ''
+ } catch {}
+ meta.logger.error(
+ `Unable to send event ${event} for ${email} to Intercom. Status Code: ${sendEventResponse.status}. Error message: ${errorMessage}`
+ )
+ } else {
+ meta.logger.log(`Sent event ${event} for ${email} to Intercom`)
+ }
+}
+
+async function fetchWithRetry(meta: IntercomMeta, url: string, options = {}, method = 'GET'): Promise {
+ try {
+ const res = await meta.fetch(url, { method: method, ...options })
+ return res
+ } catch {
+ throw new RetryError('Service is down, retry later')
+ }
+}
+
+function statusOk(res: Response) {
+ return String(res.status)[0] === '2'
+}
+
+function isEmail(email: string): boolean {
+ const re =
+ /^(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
+ return re.test(String(email).toLowerCase())
+}
+
+function getEmailFromEvent(event: ProcessedPluginEvent): string | null {
+ if (isEmail(event.distinct_id)) {
+ return event.distinct_id
+ } else if (event['$set'] && Object.keys(event['$set']).includes('email')) {
+ if (isEmail(event['$set']['email'])) {
+ return event['$set']['email']
+ }
+ } else if (event['properties'] && Object.keys(event['properties']).includes('email')) {
+ if (isEmail(event['properties']['email'])) {
+ return event['properties']['email']
+ }
+ }
+
+ return null
+}
+
+function isIgnoredEmailDomain(ignoredEmailDomains: string, email: string): boolean {
+ const emailDomainsToIgnore = (ignoredEmailDomains || '').split(',').map((e) => e.trim())
+ return emailDomainsToIgnore.includes(email.split('@')[1])
+}
+
+function isTriggeringEvent(triggeringEvents: string, event: string): boolean {
+ const validEvents = (triggeringEvents || '').split(',').map((e) => e.trim())
+ return validEvents.indexOf(event) >= 0
+}
+
+function getTimestamp(meta: IntercomMeta, event: ProcessedPluginEvent): number {
+ try {
+ if (event['timestamp']) {
+ return Number(event['timestamp'])
+ }
+ } catch {
+ meta.logger.error('Event timestamp cannot be parsed as a number')
+ }
+ const date = new Date()
+ return Math.floor(date.getTime() / 1000)
+}
+
+export const intercomPlugin: LegacyPlugin = {
+ id: 'intercom',
+ metadata: metadata as any,
+ onEvent,
+ setupPlugin: () => Promise.resolve(),
+}
diff --git a/plugin-server/src/cdp/legacy-plugins/intercom/plugin.json b/plugin-server/src/cdp/legacy-plugins/intercom/plugin.json
new file mode 100644
index 0000000000000..6de2b2723ffcb
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/intercom/plugin.json
@@ -0,0 +1,42 @@
+{
+ "name": "Intercom",
+ "url": "TODO",
+ "description": "Send event data to Intercom on PostHog events.",
+ "main": "index.ts",
+ "config": [
+ {
+ "key": "intercomApiKey",
+ "hint": "Create an [Intercom app](https://developers.intercom.com/building-apps/), then go to Configure > Authentication to find your key.",
+ "name": "Intercom API Key",
+ "type": "string",
+ "default": "",
+ "required": true,
+ "secret": true
+ },
+ {
+ "key": "triggeringEvents",
+ "hint": "A comma-separated list of PostHog events you want to send to Intercom (e.g.: '$identify,mycustomevent' ).",
+ "name": "Triggering events",
+ "type": "string",
+ "default": "$identify",
+ "required": true
+ },
+ {
+ "key": "ignoredEmailDomains",
+ "hint": "A comma-separated list of email domains to ignore and not send events for in Intercom (e.g. 'posthog.com,dev.posthog.com' ).",
+ "name": "Email domains to skip",
+ "type": "string",
+ "default": "",
+ "required": false
+ },
+ {
+ "key": "useEuropeanDataStorage",
+ "hint": "Send events to api.eu.intercom.com, if you are using Intercom's European Data Hosting.",
+ "name": "Send events to European Data Hosting",
+ "type": "choice",
+ "default": "No",
+ "choices": ["Yes", "No"],
+ "required": false
+ }
+ ]
+}
diff --git a/plugin-server/src/cdp/legacy-plugins/types.ts b/plugin-server/src/cdp/legacy-plugins/types.ts
new file mode 100644
index 0000000000000..f9df0a48e7ad1
--- /dev/null
+++ b/plugin-server/src/cdp/legacy-plugins/types.ts
@@ -0,0 +1,28 @@
+import { PluginConfigSchema, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
+
+import { Response, trackedFetch } from '~/src/utils/fetch'
+
+export type LegacyPluginLogger = {
+ debug: (...args: any[]) => void
+ warn: (...args: any[]) => void
+ log: (...args: any[]) => void
+ error: (...args: any[]) => void
+}
+
+export type LegacyPluginMeta = {
+ config: Record
+ global: Record
+
+ logger: LegacyPluginLogger
+ fetch: (...args: Parameters) => Promise
+}
+
+export type LegacyPlugin = {
+ id: string
+ metadata: {
+ name: string
+ config: PluginConfigSchema[]
+ }
+ onEvent(event: ProcessedPluginEvent, meta: LegacyPluginMeta): Promise
+ setupPlugin?: (meta: LegacyPluginMeta) => Promise
+}
diff --git a/plugin-server/src/cdp/services/hog-executor.service.ts b/plugin-server/src/cdp/services/hog-executor.service.ts
index 9962d51e03b24..5514d03206d28 100644
--- a/plugin-server/src/cdp/services/hog-executor.service.ts
+++ b/plugin-server/src/cdp/services/hog-executor.service.ts
@@ -93,7 +93,7 @@ export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globa
}
}
-const sanitizeLogMessage = (args: any[], sensitiveValues?: string[]): string => {
+export const sanitizeLogMessage = (args: any[], sensitiveValues?: string[]): string => {
let message = args.map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)).join(', ')
// Find and replace any sensitive values
diff --git a/plugin-server/src/cdp/services/hog-function-manager.service.test.ts b/plugin-server/src/cdp/services/hog-function-manager.service.test.ts
index c4530200b8b5c..b31892365e316 100644
--- a/plugin-server/src/cdp/services/hog-function-manager.service.test.ts
+++ b/plugin-server/src/cdp/services/hog-function-manager.service.test.ts
@@ -70,24 +70,6 @@ describe('HogFunctionManager', () => {
})
)
- // hogFunctions.push(
- // await insertHogFunction(hub.postgres, teamId1, {
- // name: 'Email Provider team 1',
- // type: 'email',
- // inputs_schema: [
- // {
- // type: 'email',
- // key: 'message',
- // },
- // ],
- // inputs: {
- // email: {
- // value: { from: 'me@a.com', to: 'you@b.com', subject: 'subject', html: 'text' },
- // },
- // },
- // })
- // )
-
hogFunctions.push(
await insertHogFunction(hub.postgres, teamId2, {
name: 'Test Hog Function team 2',
@@ -149,6 +131,7 @@ describe('HogFunctionManager', () => {
encrypted_inputs: null,
masking: null,
mappings: null,
+ template_id: null,
depends_on_integration_ids: new Set([integrations[0].id]),
},
])
diff --git a/plugin-server/src/cdp/services/hog-function-manager.service.ts b/plugin-server/src/cdp/services/hog-function-manager.service.ts
index 0c4a693b22f46..0c3b6d04052c1 100644
--- a/plugin-server/src/cdp/services/hog-function-manager.service.ts
+++ b/plugin-server/src/cdp/services/hog-function-manager.service.ts
@@ -25,6 +25,7 @@ const HOG_FUNCTION_FIELDS = [
'bytecode',
'masking',
'type',
+ 'template_id',
]
export class HogFunctionManagerService {
diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts
index bd154a2c2d29c..1a0cd607c7a36 100644
--- a/plugin-server/src/cdp/types.ts
+++ b/plugin-server/src/cdp/types.ts
@@ -212,7 +212,7 @@ export type HogFunctionInvocation = {
teamId: Team['id']
hogFunction: HogFunctionType
priority: number
- queue: 'hog' | 'fetch'
+ queue: 'hog' | 'fetch' | 'plugins'
queueParameters?: HogFunctionInvocationQueueParameters
// The current vmstate (set if the invocation is paused)
vmState?: VMState
@@ -305,6 +305,7 @@ export type HogFunctionType = {
mappings?: HogFunctionMappingType[] | null
masking?: HogFunctionMasking | null
depends_on_integration_ids?: Set
+ template_id?: string
}
export type HogFunctionInputType = {
diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts
index 5de8a99a0c74c..447c921c645ae 100644
--- a/plugin-server/src/config/config.ts
+++ b/plugin-server/src/config/config.ts
@@ -187,7 +187,6 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_TTL: 60 * 60 * 24, // This is really long as it is essentially only important to make sure the key is eventually deleted
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
- CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
CDP_HOG_FILTERS_TELEMETRY_TEAMS: '',
CDP_REDIS_PASSWORD: '',
CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true,
diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts
index d6c8251b294c5..6807430d9e931 100644
--- a/plugin-server/src/main/pluginsServer.ts
+++ b/plugin-server/src/main/pluginsServer.ts
@@ -11,6 +11,7 @@ import v8Profiler from 'v8-profiler-next'
import { getPluginServerCapabilities } from '../capabilities'
import { CdpApi } from '../cdp/cdp-api'
+import { CdpCyclotronWorkerPlugins } from '../cdp/consumers/cdp-cyclotron-plugins-worker.consumer'
import { CdpCyclotronWorker, CdpCyclotronWorkerFetch } from '../cdp/consumers/cdp-cyclotron-worker.consumer'
import { CdpInternalEventsConsumer } from '../cdp/consumers/cdp-internal-event.consumer'
import { CdpProcessedEventsConsumer } from '../cdp/consumers/cdp-processed-events.consumer'
@@ -547,6 +548,17 @@ export async function startPluginsServer(
}
}
+ if (capabilities.cdpCyclotronWorkerPlugins) {
+ const hub = await setupHub()
+ if (!hub.CYCLOTRON_DATABASE_URL) {
+ status.error('💥', 'Cyclotron database URL not set.')
+ } else {
+ const worker = new CdpCyclotronWorkerPlugins(hub)
+ await worker.start()
+ services.push(worker.service)
+ }
+ }
+
if (capabilities.http) {
const app = setupCommonRoutes(services)
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index 9cf3d1bdf7f92..ec42460600740 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -90,6 +90,7 @@ export enum PluginServerMode {
cdp_processed_events = 'cdp-processed-events',
cdp_internal_events = 'cdp-internal-events',
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
+ cdp_cyclotron_worker_plugins = 'cdp-cyclotron-worker-plugins',
functional_tests = 'functional-tests',
}
@@ -118,7 +119,6 @@ export type CdpConfig = {
CDP_WATCHER_REFILL_RATE: number // The number of tokens to be refilled per second
CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
- CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
CDP_HOG_FILTERS_TELEMETRY_TEAMS: string
CDP_CYCLOTRON_BATCH_SIZE: number
CDP_CYCLOTRON_BATCH_DELAY_MS: number
@@ -386,6 +386,7 @@ export interface PluginServerCapabilities {
cdpProcessedEvents?: boolean
cdpInternalEvents?: boolean
cdpCyclotronWorker?: boolean
+ cdpCyclotronWorkerPlugins?: boolean
appManagementSingleton?: boolean
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
diff --git a/plugin-server/src/utils/fetch.ts b/plugin-server/src/utils/fetch.ts
index 96358d8ec2864..0fcd39cd20dc1 100644
--- a/plugin-server/src/utils/fetch.ts
+++ b/plugin-server/src/utils/fetch.ts
@@ -9,6 +9,8 @@ import net from 'node:net'
import fetch, { type RequestInfo, type RequestInit, type Response, FetchError, Request } from 'node-fetch'
import { URL } from 'url'
+export type { Response }
+
import { runInSpan } from '../sentry'
import { isProdEnv } from './env-utils'
diff --git a/plugin-server/tests/cdp/cdp-e2e.test.ts b/plugin-server/tests/cdp/cdp-e2e.test.ts
index cce755643caa9..425eeeb37f6b7 100644
--- a/plugin-server/tests/cdp/cdp-e2e.test.ts
+++ b/plugin-server/tests/cdp/cdp-e2e.test.ts
@@ -34,6 +34,7 @@ describe('CDP Consumer loop', () => {
let processedEventsConsumer: CdpProcessedEventsConsumer
let cyclotronWorker: CdpCyclotronWorker | undefined
let cyclotronFetchWorker: CdpCyclotronWorkerFetch | undefined
+
let hub: Hub
let team: Team
let fnFetchNoFilters: HogFunctionType
diff --git a/posthog/cdp/migrations.py b/posthog/cdp/migrations.py
new file mode 100644
index 0000000000000..5156c5e496aac
--- /dev/null
+++ b/posthog/cdp/migrations.py
@@ -0,0 +1,123 @@
+from posthog.api.hog_function import HogFunctionSerializer
+from posthog.models.hog_functions.hog_function import HogFunction
+from posthog.models.plugin import PluginAttachment, PluginConfig
+
+
+def migrate_legacy_plugins(dry_run=True, team_ids=None, test_mode=True):
+ # Get all legacy plugin_configs that are active with their attachments and global values
+ legacy_plugins = PluginConfig.objects.select_related("plugin").filter(enabled=True)
+
+ if team_ids:
+ legacy_plugins = legacy_plugins.filter(team_id__in=team_ids)
+
+ hog_functions = []
+
+ for plugin_config in legacy_plugins:
+ methods = plugin_config.plugin.capabilities.get("methods", [])
+
+ if "onEvent" not in methods or "composeWebhook" not in methods:
+ print("Skipping plugin", plugin_config.plugin.name, "as it doesn't have onEvent or composeWebhook") # noqa: T201
+ continue
+
+ print("Attempting to migrate plugin", plugin_config) # noqa: T201
+ url: str = plugin_config.plugin.url or ""
+
+ if not url:
+ print("Skipping plugin", plugin_config.plugin.name, "as it doesn't have a url") # noqa: T201
+ continue
+
+ plugin_id = url.replace("inline://", "").replace("https://github.com/PostHog/", "")
+ plugin_name = plugin_config.plugin.name
+
+ if test_mode:
+ plugin_name = f"[CDP-TEST-HIDDEN] {plugin_name}"
+
+ inputs = {}
+ inputs_schema = []
+
+ # Iterate over the plugin config to build the inputs
+
+ for schema in plugin_config.plugin.config_schema:
+ if not schema.get("key"):
+ continue
+
+ print("Converting schema", schema) # noqa: T201
+
+ # Some hacky stuff to convert the schemas correctly
+ input_schema = {
+ "key": schema["key"],
+ "type": schema["type"],
+ "label": schema.get("name", schema["key"]),
+ "description": schema.get("hint", ""),
+ "secret": schema.get("secret", False),
+ "required": schema.get("required", False),
+ "default": schema.get("default", None),
+ }
+
+ if schema["type"] == "choice":
+ input_schema["choices"] = [
+ {
+ "label": choice,
+ "value": choice,
+ }
+ for choice in schema["choices"]
+ ]
+ input_schema["type"] = "string"
+ elif schema["type"] == "attachment":
+ input_schema["type"] = "string"
+
+ inputs_schema.append(input_schema)
+
+ for key, value in plugin_config.config.items():
+ inputs[key] = {"value": value}
+
+ # Load all attachments for this plugin config
+ attachments = PluginAttachment.objects.filter(plugin_config=plugin_config)
+
+ for attachment in attachments:
+ inputs[attachment.key] = {"value": attachment.parse_contents()}
+
+ serializer_context = {"team": plugin_config.team, "get_team": (lambda config=plugin_config: config.team)}
+
+ data = {
+ "template_id": f"plugin-{plugin_id}",
+ "type": "destination",
+ "name": plugin_name,
+ "description": "This is a legacy destination migrated from our old plugin system.",
+ "filters": {},
+ "inputs": inputs,
+ "inputs_schema": inputs_schema,
+ "enabled": True,
+ "icon_url": plugin_config.plugin.icon,
+ }
+
+ print("Attempting to create hog function...") # noqa: T201
+
+ serializer = HogFunctionSerializer(
+ data=data,
+ context=serializer_context,
+ )
+ serializer.is_valid(raise_exception=True)
+ hog_functions.append(HogFunction(**serializer.validated_data))
+
+ print(hog_functions) # noqa: T201
+
+ if not hog_functions:
+ print("No hog functions to create") # noqa: T201
+ return []
+
+ if dry_run:
+ print("Dry run, not creating hog functions") # noqa: T201
+ return hog_functions
+
+ print("Creating hog functions") # noqa: T201
+ HogFunction.objects.bulk_create(hog_functions)
+
+ if not test_mode:
+ print("Disabling old plugins") # noqa: T201
+ # Disable the old plugins
+ PluginConfig.objects.filter(id__in=[plugin_config.id for plugin_config in legacy_plugins]).update(enabled=False)
+
+ print("Done") # noqa: T201
+
+ return hog_functions
diff --git a/posthog/cdp/templates/_internal/template_legacy_plugin.py b/posthog/cdp/templates/_internal/template_legacy_plugin.py
new file mode 100644
index 0000000000000..184251cc6b2c6
--- /dev/null
+++ b/posthog/cdp/templates/_internal/template_legacy_plugin.py
@@ -0,0 +1,17 @@
+from posthog.cdp.templates.hog_function_template import HogFunctionTemplate
+
+
+def create_legacy_plugin_template(template_id: str) -> HogFunctionTemplate:
+ return HogFunctionTemplate(
+ status="alpha",
+ type="destination",
+ id=f"{template_id}",
+ name=f"Legacy plugin {template_id}",
+ description="Legacy plugins",
+ icon_url="/static/hedgehog/builder-hog-01.png",
+ category=["Custom"],
+ hog="""
+ print('not used')
+ """.strip(),
+ inputs_schema=[],
+ )
diff --git a/posthog/management/commands/migrate_plugins_to_hog_functions.py b/posthog/management/commands/migrate_plugins_to_hog_functions.py
new file mode 100644
index 0000000000000..ccc58b88604ab
--- /dev/null
+++ b/posthog/management/commands/migrate_plugins_to_hog_functions.py
@@ -0,0 +1,27 @@
+from django.core.management.base import BaseCommand
+
+from posthog.cdp.migrations import migrate_legacy_plugins
+
+
+class Command(BaseCommand):
+ help = "Migrate plugins to HogFunctions"
+
+ def add_arguments(self, parser):
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="If set, will not actually perform the migration, but will print out what would have been done",
+ )
+ parser.add_argument("--team-ids", type=str, help="Comma separated list of team ids to sync")
+ parser.add_argument(
+ "--test-mode", action="store_true", help="Whether to just copy as a test function rather than migrate"
+ )
+
+ def handle(self, *args, **options):
+ dry_run = options["dry_run"]
+ team_ids = options["team_ids"]
+ test_mode = options["test_mode"]
+
+ print("Migrating plugins to hog functions", options) # noqa: T201
+
+ migrate_legacy_plugins(dry_run=dry_run, team_ids=team_ids, test_mode=test_mode)
diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py
index a715f10b86b7b..244f887d3724c 100644
--- a/posthog/models/hog_functions/hog_function.py
+++ b/posthog/models/hog_functions/hog_function.py
@@ -6,6 +6,7 @@
from django.dispatch.dispatcher import receiver
import structlog
+from posthog.cdp.templates._internal.template_legacy_plugin import create_legacy_plugin_template
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate
from posthog.helpers.encrypted_fields import EncryptedJSONStringField
from posthog.models.action.action import Action
@@ -96,6 +97,9 @@ class Meta:
def template(self) -> Optional[HogFunctionTemplate]:
from posthog.cdp.templates import ALL_HOG_FUNCTION_TEMPLATES_BY_ID
+ if self.template_id and self.template_id.startswith("plugin-"):
+ return create_legacy_plugin_template(self.template_id)
+
return ALL_HOG_FUNCTION_TEMPLATES_BY_ID.get(self.template_id, None)
@property
diff --git a/posthog/models/plugin.py b/posthog/models/plugin.py
index d2e204bdf3211..a01a7347efb17 100644
--- a/posthog/models/plugin.py
+++ b/posthog/models/plugin.py
@@ -1,4 +1,5 @@
import datetime
+import json
import os
import subprocess
from dataclasses import dataclass
@@ -274,6 +275,21 @@ class PluginAttachment(models.Model):
file_size = models.IntegerField()
contents = models.BinaryField()
+ def parse_contents(self) -> str | None:
+ contents: bytes | None = self.contents
+ if not contents:
+ return None
+
+ try:
+ if self.content_type == "application/json":
+ return json.loads(contents)
+
+ if self.content_type == "text/plain":
+ return contents.decode("utf-8")
+ return None
+ except Exception:
+ return None
+
class PluginStorage(models.Model):
plugin_config = models.ForeignKey("PluginConfig", on_delete=models.CASCADE)