Skip to content

Commit

Permalink
feat(cdp): enable execution of ordered hogTransformations (#27963)
Browse files Browse the repository at this point in the history
  • Loading branch information
meikelmosby authored Jan 29, 2025
1 parent 350d2d7 commit 2ae0269
Show file tree
Hide file tree
Showing 12 changed files with 510 additions and 43 deletions.

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions plugin-server/src/cdp/services/hog-function-manager.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ describe('HogFunctionManager', () => {
name: 'Test Hog Function team 1',
type: 'destination',
enabled: true,
execution_order: null,
bytecode: {},
filters: null,
inputs_schema: [
Expand Down Expand Up @@ -221,3 +222,121 @@ describe('HogFunctionManager', () => {
})
})
})

describe('Hogfunction Manager - Execution Order', () => {
let hub: Hub
let manager: HogFunctionManagerService
let hogFunctions: HogFunctionType[]
let teamId: number

beforeEach(async () => {
hub = await createHub()
await resetTestDatabase()
manager = new HogFunctionManagerService(hub)

const team = await hub.db.fetchTeam(2)
teamId = await createTeam(hub.db.postgres, team!.organization_id)

hogFunctions = []

hogFunctions.push(
await insertHogFunction(hub.postgres, teamId, {
name: 'fn1',
execution_order: 1,
type: 'transformation',
})
)

hogFunctions.push(
await insertHogFunction(hub.postgres, teamId, {
name: 'fn2',
execution_order: 2,
type: 'transformation',
})
)

hogFunctions.push(
await insertHogFunction(hub.postgres, teamId, {
name: 'fn3',
execution_order: 3,
type: 'transformation',
})
)

await manager.start(['transformation'])
})

afterEach(async () => {
await manager.stop()
await closeHub(hub)
})

it('maintains correct execution order after individual reloads', async () => {
// Initial order check
let teamFunctions = manager.getTeamHogFunctions(teamId)
expect(teamFunctions).toHaveLength(3)
expect(teamFunctions.map((f) => ({ name: f.name, order: f.execution_order }))).toEqual([
{ name: 'fn1', order: 1 },
{ name: 'fn2', order: 2 },
{ name: 'fn3', order: 3 },
])

// change order in database and reload single functions to simulate changes over the django API.

// Update fn2's to be last
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET execution_order = 3 WHERE id = $1`,
[hogFunctions[1].id],
'testKey'
)

// therefore fn3's execution order should be 2
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET execution_order = 2 WHERE id = $1`,
[hogFunctions[2].id],
'testKey'
)

await manager.reloadHogFunctions(teamId, [hogFunctions[1].id, hogFunctions[2].id])
teamFunctions = manager.getTeamHogFunctions(teamId)
expect(teamFunctions).toHaveLength(3)
expect(teamFunctions.map((f) => ({ name: f.name, order: f.execution_order }))).toEqual([
{ name: 'fn1', order: 1 },
{ name: 'fn3', order: 2 },
{ name: 'fn2', order: 3 },
])

// change fn1 to be last
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET execution_order = 3 WHERE id = $1`,
[hogFunctions[0].id],
'testKey'
)
// change fn3 to be first
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET execution_order = 1 WHERE id = $1`,
[hogFunctions[2].id],
'testKey'
)
// change fn2 to be second
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET execution_order = 2 WHERE id = $1`,
[hogFunctions[1].id],
'testKey'
)

await manager.reloadHogFunctions(teamId, [hogFunctions[0].id, hogFunctions[1].id, hogFunctions[2].id])
teamFunctions = manager.getTeamHogFunctions(teamId)
expect(teamFunctions).toHaveLength(3)
expect(teamFunctions.map((f) => ({ name: f.name, order: f.execution_order }))).toEqual([
{ name: 'fn3', order: 1 },
{ name: 'fn2', order: 2 },
{ name: 'fn1', order: 3 },
])
})
})
40 changes: 27 additions & 13 deletions plugin-server/src/cdp/services/hog-function-manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { HogFunctionType, HogFunctionTypeType, IntegrationType } from '../types'
type HogFunctionCache = {
functions: Record<HogFunctionType['id'], HogFunctionType | undefined>
teams: Record<Team['id'], HogFunctionType['id'][] | undefined>
orderedTeamFunctions: Record<Team['id'], HogFunctionType[] | undefined>
}

const HOG_FUNCTION_FIELDS = [
Expand All @@ -26,6 +27,7 @@ const HOG_FUNCTION_FIELDS = [
'masking',
'type',
'template_id',
'execution_order',
]

export class HogFunctionManagerService {
Expand All @@ -42,6 +44,7 @@ export class HogFunctionManagerService {
this.cache = {
functions: {},
teams: {},
orderedTeamFunctions: {},
}

this.pubSub = new PubSub(this.hub, {
Expand Down Expand Up @@ -94,28 +97,35 @@ export class HogFunctionManagerService {
throw new Error('HogFunctionManagerService is not ready! Run HogFunctionManagerService.start() before this')
}

return Object.values(this.cache.teams[teamId] || [])
.map((id) => this.cache.functions[id])
.filter((x) => !!x) as HogFunctionType[]
}
if (!this.cache.orderedTeamFunctions[teamId]) {
const functions = Object.values(this.cache.teams[teamId] || [])
.map((id) => this.cache.functions[id])
.filter((x) => !!x) as HogFunctionType[]

public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManagerService is not ready! Run HogFunctionManagerService.start() before this')
this.cache.orderedTeamFunctions[teamId] = this.sortHogFunctions(functions)
}

return this.cache.functions[id]
return this.cache.orderedTeamFunctions[teamId] || []
}

private sortHogFunctions(functions: HogFunctionType[]): HogFunctionType[] {
return [...functions].sort((a, b) => {
if (a.execution_order === null || a.execution_order === undefined) {
return 1
}
if (b.execution_order === null || b.execution_order === undefined) {
return -1
}
return a.execution_order - b.execution_order
})
}

public getTeamHogFunction(teamId: Team['id'], hogFunctionId: HogFunctionType['id']): HogFunctionType | undefined {
public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManagerService is not ready! Run HogFunctionManagerService.start() before this')
}

const fn = this.cache.functions[hogFunctionId]
if (fn?.team_id === teamId) {
return fn
}
return this.cache.functions[id]
}

public teamHasHogDestinations(teamId: Team['id']): boolean {
Expand All @@ -130,6 +140,7 @@ export class HogFunctionManagerService {
SELECT ${HOG_FUNCTION_FIELDS.join(', ')}
FROM posthog_hogfunction
WHERE deleted = FALSE AND enabled = TRUE AND type = ANY($1)
ORDER BY execution_order NULLS LAST
`,
[this.hogTypes],
'fetchAllHogFunctions'
Expand All @@ -142,6 +153,7 @@ export class HogFunctionManagerService {
const cache: HogFunctionCache = {
functions: {},
teams: {},
orderedTeamFunctions: {},
}

for (const item of items) {
Expand Down Expand Up @@ -181,6 +193,8 @@ export class HogFunctionManagerService {
this.cache.teams[teamId] = this.cache.teams[teamId] || []
this.cache.teams[teamId]!.push(item.id)
}

delete this.cache.orderedTeamFunctions[teamId]
}

public async fetchHogFunction(id: HogFunctionType['id']): Promise<HogFunctionType | null> {
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ export type HogFunctionType = {
masking?: HogFunctionMasking | null
depends_on_integration_ids?: Set<IntegrationType['id']>
template_id?: string
execution_order?: number
}

export type HogFunctionInputType = {
Expand Down
Loading

0 comments on commit 2ae0269

Please sign in to comment.