Skip to content

Commit

Permalink
feat(zeebe): create and cancel process instances over REST
Browse files Browse the repository at this point in the history
  • Loading branch information
jwulf committed Sep 9, 2024
1 parent d6303a4 commit 3856118
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Entity keys in Camunda 8 are stored and represented as `int64` numbers. The rang

Some number values - for example: "_total returned results_ " - may be specified as `int64` in the API specifications. Although these numbers will usually not contain unsafe values, they are always serialised to `string`.

For `int64` values whose type is not known ahead of time, such as job variables, you can pass an annotated data transfer object (DTO) to decode them reliably. If no DTO is specified, the default behavior of the SDK is to serialise all numbers to JavaScript `number`, and if a number value is detected at a runtime that cannot be accurately stored as `number`, to throw an exception.
For `int64` values whose type is not known ahead of time, such as job variables, you can pass an annotated data transfer object (DTO) to decode them reliably. If no DTO is specified, the default behavior of the SDK is to serialise all numbers to JavaScript `number`, and to throw an exception if a number value is detected at a runtime that cannot be accurately represented as the JavaScript `number` type (that is, a value greater than 2^53-1).

## Authorization

Expand Down
55 changes: 52 additions & 3 deletions src/c8/lib/C8RestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { IOAuthProvider } from '../../oauth'
import {
ActivateJobsRequest,
CompleteJobRequest,
CreateProcessInstanceRequest,
ErrorJobWithVariables,
FailJobRequest,
IProcessVariables,
Expand Down Expand Up @@ -58,6 +59,7 @@ export class C8RestClient {
options?.oAuthProvider ?? constructOAuthProvider(config)
this.userAgentString = createUserAgentString(config)
this.tenantId = config.CAMUNDA_TENANT_ID

const baseUrl = RequireConfiguration(
config.ZEEBE_REST_ADDRESS,
'ZEEBE_REST_ADDRESS'
Expand Down Expand Up @@ -269,7 +271,7 @@ export class C8RestClient {
...req
} = request

const body = losslessStringify(this.addDefaultTenantId(req))
const body = losslessStringify(this.addDefaultTenantIds(req))

const jobDto = createSpecializedRestApiJobClass(
inputVariableDto,
Expand Down Expand Up @@ -364,6 +366,9 @@ export class C8RestClient {
)
}

/**
* Marks the incident as resolved; most likely a call to Update job will be necessary to reset the job’s retries, followed by this call.
*/
public async resolveIncident(incidentKey: string) {
const headers = await this.getHeaders()

Expand All @@ -374,6 +379,40 @@ export class C8RestClient {
)
}

/**
* Create and start a process instance
*/
public async createProcessInstance(request: CreateProcessInstanceRequest) {
const headers = await this.getHeaders()

return this.rest.then((rest) =>
rest.post(`process-instances`, {
body: losslessStringify(this.addDefaultTenantId(request)),
headers,
})
)
}

/**
* Cancel an active process instance
*/
public async cancelProcessInstance({
processInstanceKey,
operationReference,
}: {
processInstanceKey: string
operationReference?: string
}) {
const headers = await this.getHeaders()

return this.rest.then((rest) =>
rest.post(`process-instances/${processInstanceKey}/cancellation`, {
body: JSON.stringify({ operationReference }),
headers,
})
)
}

private addJobMethods = <Variables, CustomHeaders>(
job: Job<Variables, CustomHeaders>
): Job<Variables, CustomHeaders> &
Expand All @@ -396,14 +435,24 @@ export class C8RestClient {
fail: (failJobRequest) => this.failJob(failJobRequest),
/* At this point, no capacity handling in the SDK is implemented, so this has no effect */
forward: () => JOB_ACTION_ACKNOWLEDGEMENT,
modifyJobTimeout: ({ newTimeoutMs }: { newTimeoutMs: number }) =>
this.updateJob({ jobKey: job.key, timeout: newTimeoutMs }),
}
}

/**
* Helper method to add the default tenantIds if we are not passed explicit tenantIds
*/
private addDefaultTenantId<T extends { tenantIds?: string[] }>(request: T) {
const tenantIds = request.tenantIds ?? this.tenantId ? [this.tenantId] : []
private addDefaultTenantId<T extends { tenantId?: string }>(request: T) {
const tenantId = request.tenantId ?? this.tenantId
return { ...request, tenantId }
}

/**
* Helper method to add the default tenantIds if we are not passed explicit tenantIds
*/
private addDefaultTenantIds<T extends { tenantIds?: string[] }>(request: T) {
const tenantIds = request.tenantIds ?? [this.tenantId]
return { ...request, tenantIds }
}
}
5 changes: 5 additions & 0 deletions src/lib/LosslessJsonParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
*
* It also handles nested Dtos by using the `@ChildDto` decorator.
*
* Update: added an optional `key` parameter to support the Camunda 8 REST API's use of an array under a key, e.g. { jobs : Job[] }
*
* More details on the design here: https://github.com/camunda/camunda-8-js-sdk/issues/81#issuecomment-2022213859
*
* See this article to understand why this is necessary: https://jsoneditoronline.org/indepth/parse/why-does-json-parse-corrupt-large-numbers/
*/

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down Expand Up @@ -127,6 +131,7 @@ export function losslessParseArray<T = any>(
}

/**
* losslessParse uses lossless-json parse to deserialize JSON.
* With no Dto, the parser will throw if it encounters an int64 number that cannot be safely represented as a JS number.
*
* @param json the JSON string to parse
Expand Down
9 changes: 7 additions & 2 deletions src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ClientReadableStream } from '@grpc/grpc-js'
import { Chalk } from 'chalk'
import { Response } from 'got'
import { LosslessNumber } from 'lossless-json'
import { MaybeTimeDuration } from 'typed-duration'

Expand Down Expand Up @@ -232,9 +233,13 @@ export interface JobCompletionInterfaceRest<WorkerOutputVariables> {
*/
error: (error: ErrorJobWithVariables) => Promise<JOB_ACTION_ACKNOWLEDGEMENT>
/**
* Extend the timeout for the job - to be implemented when ModifyJobTimeout becomes available
* Extend the timeout for the job by setting a new timeout
*/
// extendJobTimeout()
modifyJobTimeout: ({
newTimeoutMs,
}: {
newTimeoutMs: number
}) => Promise<Response<string>>
}

export interface ZeebeJob<
Expand Down

0 comments on commit 3856118

Please sign in to comment.