From d8139e6ce7cf4517268889d9a963272b238853ae Mon Sep 17 00:00:00 2001 From: Josh Wulf Date: Wed, 12 Jun 2024 14:48:02 +1200 Subject: [PATCH] * feat(zeebe): deployResources to deploy multiple resources atomically * docs(zeebe): document multi-tenant workers * feat(zeebe): deployResources to deploy multiple resources atomically add deployResources method fixes #173 --- README.md | 28 +++ .../integration/Client-DeployResource.spec.ts | 15 ++ src/zeebe/lib/deployResource.ts | 56 ++++++ src/zeebe/zb/ZeebeGrpcClient.ts | 178 +++++++----------- 4 files changed, 163 insertions(+), 114 deletions(-) create mode 100644 src/zeebe/lib/deployResource.ts diff --git a/README.md b/README.md index 65e3d797..21e78ed6 100644 --- a/README.md +++ b/README.md @@ -281,3 +281,31 @@ Please note that only jobs that become available _after_ the stream is opened ar In this release, this is not handled for you. You must both poll and stream jobs to make sure that you get jobs that were available before your application started as well as jobs that become available after your application starts. In a subsequent release, the ZeebeWorker will transparently handle this for you. + +## Multi-tenant workers + +Workers, both polling and streaming, can be multi-tenanted, requesting jobs from more than one tenant. + +Example: + +```typescript +client.createWorker({ + taskHandler: (job) => { + console.log(job.tenantId) // '' | 'green' + return job.complete() + }, + taskType: 'multi-tenant-work', + tenantIds: ['', 'green'], +}) + +client.streamJobs({ + taskHandler: async (job) => { + console.log(job.tenantId) // '' | 'green' + return job.complete() + }, + type: 'multi-tenant-stream-work', + tenantIds: ['', 'green'], + worker: 'stream-worker', + timeout: 2000, +}) +``` diff --git a/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts b/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts index ccfb9de9..3f6419a9 100644 --- a/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts +++ b/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts @@ -61,3 +61,18 @@ test('deploys a Form', async () => { }) expect(result.deployments[0].form).not.toBeNull() }) +test.only('deploys multiple resources', async () => { + const result = await zbc.deployResources([ + { + processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn', + }, + { + decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn', + }, + { + form: fs.readFileSync('./src/__tests__/testdata/form_1.form'), + name: 'form_1.form', + }, + ]) + expect(result.deployments.length).toBe(4) +}) diff --git a/src/zeebe/lib/deployResource.ts b/src/zeebe/lib/deployResource.ts new file mode 100644 index 00000000..aef04bfd --- /dev/null +++ b/src/zeebe/lib/deployResource.ts @@ -0,0 +1,56 @@ +import { readFileSync } from 'fs' + +export type Resource = + | { name: string; process: Buffer; tenantId?: string } + | { processFilename: string; tenantId?: string } + | { name: string; decision: Buffer; tenantId?: string } + | { decisionFilename: string; tenantId?: string } + | { name: string; form: Buffer; tenantId?: string } + | { formFilename: string; tenantId?: string } + +const isProcess = ( + maybeProcess: any // eslint-disable-line @typescript-eslint/no-explicit-any +): maybeProcess is { process: Buffer; name: string } => !!maybeProcess.process +const isProcessFilename = ( + maybeProcessFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any +): maybeProcessFilename is { processFilename: string } => + !!maybeProcessFilename.processFilename +const isDecision = ( + maybeDecision: any // eslint-disable-line @typescript-eslint/no-explicit-any +): maybeDecision is { decision: Buffer; name: string } => + !!maybeDecision.decision +const isDecisionFilename = ( + maybeDecisionFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any +): maybeDecisionFilename is { decisionFilename: string } => + !!maybeDecisionFilename.decisionFilename +// default fall-through +/* const isForm = ( maybeForm: any ): maybeForm is { form: Buffer; name: string } => + !!maybeForm.form + */ +const isFormFilename = ( + maybeFormFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any +): maybeFormFilename is { formFilename: string } => + !!maybeFormFilename.formFilename + +export function getResourceContentAndName(resource: Resource) { + if (isProcessFilename(resource)) { + const filename = resource.processFilename + const process = readFileSync(filename) + return { content: process, name: filename } + } else if (isProcess(resource)) { + return { content: resource.process, name: resource.name } + } else if (isDecisionFilename(resource)) { + const filename = resource.decisionFilename + const decision = readFileSync(filename) + return { content: decision, name: filename } + } else if (isDecision(resource)) { + return { content: resource.decision, name: resource.name } + } else if (isFormFilename(resource)) { + const filename = resource.formFilename + const form = readFileSync(filename) + return { content: form, name: filename } + } /* if (isForm(resource)) */ else { + // default fall-through + return { content: resource.form, name: resource.name } + } +} diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 5ebfbe9c..5b18238b 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -33,6 +33,7 @@ import { StatefulLogInterceptor } from '../lib/StatefulLogInterceptor' import { TypedEmitter } from '../lib/TypedEmitter' import { ZBJsonLogger } from '../lib/ZBJsonLogger' import { ZBStreamWorker } from '../lib/ZBStreamWorker' +import { Resource, getResourceContentAndName } from '../lib/deployResource' import * as ZB from '../lib/interfaces-1.0' import { ZBWorkerTaskHandler } from '../lib/interfaces-1.0' import * as Grpc from '../lib/interfaces-grpc-1.0' @@ -656,8 +657,7 @@ export class ZeebeGrpcClient extends TypedEmitter< /** * - * @description Deploys one or more resources (e.g. processes or decision models) to Zeebe. - * Note that this is an atomic call, i.e. either all resources are deployed, or none of them are. + * @description Deploys a single resources (e.g. process or decision model) to Zeebe. * * Errors: * PERMISSION_DENIED: @@ -696,13 +696,7 @@ export class ZeebeGrpcClient extends TypedEmitter< | { name: string; form: Buffer; tenantId?: string } ): Promise> async deployResource( - resource: - | { name: string; process: Buffer; tenantId?: string } - | { processFilename: string; tenantId?: string } - | { name: string; decision: Buffer; tenantId?: string } - | { decisionFilename: string; tenantId?: string } - | { name: string; form: Buffer; tenantId?: string } - | { formFilename: string; tenantId?: string } + resource: Resource ): Promise< Grpc.DeployResourceResponse< | Grpc.ProcessDeployment @@ -711,111 +705,67 @@ export class ZeebeGrpcClient extends TypedEmitter< | Grpc.FormDeployment > > { - const isProcess = ( - maybeProcess: any // eslint-disable-line @typescript-eslint/no-explicit-any - ): maybeProcess is { process: Buffer; name: string } => - !!maybeProcess.process - const isProcessFilename = ( - maybeProcessFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any - ): maybeProcessFilename is { processFilename: string } => - !!maybeProcessFilename.processFilename - const isDecision = ( - maybeDecision: any // eslint-disable-line @typescript-eslint/no-explicit-any - ): maybeDecision is { decision: Buffer; name: string } => - !!maybeDecision.decision - const isDecisionFilename = ( - maybeDecisionFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any - ): maybeDecisionFilename is { decisionFilename: string } => - !!maybeDecisionFilename.decisionFilename - // default fall-through - /* const isForm = ( maybeForm: any ): maybeForm is { form: Buffer; name: string } => - !!maybeForm.form - */ - const isFormFilename = ( - maybeFormFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any - ): maybeFormFilename is { formFilename: string } => - !!maybeFormFilename.formFilename - - if (isProcessFilename(resource)) { - const filename = resource.processFilename - const process = readFileSync(filename) - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: filename, - content: process, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } else if (isProcess(resource)) { - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: resource.name, - content: resource.process, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } else if (isDecisionFilename(resource)) { - const filename = resource.decisionFilename - const decision = readFileSync(filename) - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: filename, - content: decision, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } else if (isDecision(resource)) { - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: resource.name, - content: resource.decision, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } else if (isFormFilename(resource)) { - const filename = resource.formFilename - const form = readFileSync(filename) - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: filename, - content: form, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } /* if (isForm(resource)) */ else { - // default fall-through - return this.executeOperation('deployResource', async () => - (await this.grpc).deployResourceSync({ - resources: [ - { - name: resource.name, - content: resource.form, - }, - ], - tenantId: resource.tenantId ?? this.tenantId, - }) - ) - } + const { content, name } = getResourceContentAndName(resource) + + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ + resources: [ + { + name, + content, + }, + ], + tenantId: resource.tenantId ?? this.tenantId, + }) + ) + } + + /** + * + * @description Deploys one or more resources (e.g. processes or decision models) to Zeebe. + * Note that this is an atomic call, i.e. either all resources are deployed, or none of them are. + * + * Errors: + * PERMISSION_DENIED: + * - if a deployment to an unauthorized tenant is performed + * INVALID_ARGUMENT: + * - no resources given. + * - if at least one resource is invalid. A resource is considered invalid if: + * - the content is not deserializable (e.g. detected as BPMN, but it's broken XML) + * - the content is invalid (e.g. an event-based gateway has an outgoing sequence flow to a task) + * - if multi-tenancy is enabled, and: + * - a tenant id is not provided + * - a tenant id with an invalid format is provided + * - if multi-tenancy is disabled and a tenant id is provided + * @example + * ``` + * const zbc = new ZeebeGrpcClient() + * + * const result = await zbc.deployResources([ + * { + * processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn', + * }, + * { + * decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn', + * }, + * { + * form: fs.readFileSync('./src/__tests__/testdata/form_1.form'), + * name: 'form_1.form', + * }, + * ]) + * ``` + */ + public async deployResources(resources: Resource[], tenantId?: string) { + const resourcesToDeploy = resources.map((r) => { + const { content, name } = getResourceContentAndName(r) + return { name, content } + }) + return this.executeOperation('deployResources', async () => + (await this.grpc).deployResourceSync({ + resources: resourcesToDeploy, + tenantId: tenantId ?? this.tenantId, + }) + ) } /**