Skip to content

Commit

Permalink
feat(camunda8): support broadcastSignal over REST
Browse files Browse the repository at this point in the history
fixes #248
  • Loading branch information
jwulf committed Sep 18, 2024
1 parent 5bb6c45 commit 41915cd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
52 changes: 52 additions & 0 deletions src/__tests__/c8/rest/restBroadcastSignal.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { C8RestClient } from '../../../c8/lib/C8RestClient'
import { LosslessDto } from '../../../lib'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'

jest.setTimeout(60000)

const c8 = new C8RestClient()
let pid: string

beforeAll(async () => {
const res = await c8.deployResourcesFromFiles([
'./src/__tests__/testdata/Signal.bpmn',
])
pid = res.processes[0].processDefinitionKey
await cancelProcesses(pid)
})

afterAll(async () => {
await cancelProcesses(pid)
})

test('Can start a process with a signal', async () => {
await c8.deployResourcesFromFiles(['./src/__tests__/testdata/Signal.bpmn'])

const res = await c8.broadcastSignal({
signalName: 'test-signal',
variables: {
success: true,
},
})

expect(res.key).toBeTruthy()

await new Promise((resolve) =>
c8.createJobWorker({
type: 'signal-service-task',
worker: 'signal-worker',
timeout: 10000,
pollIntervalMs: 1000,
maxJobsToActivate: 10,
inputVariableDto: class extends LosslessDto {
success!: boolean
},
jobHandler: (job) => {
const ack = job.complete()
expect(job.variables.success).toBe(true)
resolve(null)
return ack
},
})
)
})
9 changes: 9 additions & 0 deletions src/c8/lib/C8Dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,12 @@ export interface MigrationRequest {
/** A reference key chosen by the user that will be part of all records resulting from this operation. Must be > 0 if provided. */
operationReference?: number | LosslessNumber
}

/** The signal was broadcast. */
export class BroadcastSignalResponse extends LosslessDto {
@Int64String
/** The unique ID of the signal that was broadcast. */
key!: string
/** The tenant ID of the signal that was broadcast. */
tenantId!: string
}
29 changes: 21 additions & 8 deletions src/c8/lib/C8RestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
import { IOAuthProvider } from '../../oauth'
import {
ActivateJobsRequest,
BroadcastSignalReq,
CompleteJobRequest,
CreateProcessInstanceReq,
ErrorJobWithVariables,
Expand All @@ -39,6 +40,7 @@ import {
} from '../../zeebe/types'

import {
BroadcastSignalResponse,
CreateProcessInstanceResponse,
Ctor,
DecisionDeployment,
Expand Down Expand Up @@ -124,18 +126,29 @@ export class C8RestClient {
return headers
}

/* Get the topology of the Zeebe cluster. */
public async getTopology(): Promise<TopologyResponse> {
/**
* Broadcasts a signal.
*/
public async broadcastSignal(req: BroadcastSignalReq) {
const headers = await this.getHeaders()
const request = this.addDefaultTenantId(req)
return this.rest.then((rest) =>
rest
.get('topology', { headers })
.json()
.catch((error) => {
trace('error', error)
throw error
.post(`signals/broadcast`, {
headers,
body: stringify(request),
parseJson: (text) => losslessParse(text, BroadcastSignalResponse),
})
) as Promise<TopologyResponse>
.json<BroadcastSignalResponse>()
)
}

/* Get the topology of the Zeebe cluster. */
public async getTopology() {
const headers = await this.getHeaders()
return this.rest.then((rest) =>
rest.get('topology', { headers }).json<TopologyResponse>()
)
}

/* Completes a user task with the given key. The method either completes the task or throws 400, 404, or 409.
Expand Down

0 comments on commit 41915cd

Please sign in to comment.