diff --git a/README.md b/README.md index cc768fb2..0bcb7a2b 100644 --- a/README.md +++ b/README.md @@ -65,9 +65,9 @@ For `int64` values whose type is not known ahead of time, such as job variables, Calls to APIs can be authorized using basic auth or via OAuth - a token that is obtained via a client id/secret pair exchange. -### Disable Auth +### Disable OAuth -To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example. +To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example. You can also use this when your authentication is being done using an x509 mTLS certificate. ### Basic Auth @@ -79,6 +79,15 @@ CAMUNDA_BASIC_AUTH_USERNAME=.... CAMUNDA_BASIC_AUTH_PASSWORD=... ``` +### Bearer Token Auth + +To use a Bearer token that you have already obtained, set the following value: + +```bash +CAMUNDA_AUTH_STRATEGY=BEARER +CAMUNDA_OAUTH_TOKEN=.... +``` + ### OAuth If your platform is secured with OAuth token exchange (Camunda SaaS or Self-Managed with Identity), provide the following configuration fields at a minimum, either via the `Camunda8` constructor or in environment variables: diff --git a/package-lock.json b/package-lock.json index a9885faa..f68632bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "8.6.16", "license": "Apache-2.0", "dependencies": { - "@grpc/grpc-js": "1.10.9", + "@grpc/grpc-js": "1.12.5", "@grpc/proto-loader": "0.7.13", "@types/form-data": "^2.2.1", "chalk": "^2.4.2", @@ -1429,9 +1429,10 @@ } }, "node_modules/@grpc/grpc-js": { - "version": "1.10.9", - "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.10.9.tgz", - "integrity": "sha512-5tcgUctCG0qoNyfChZifz2tJqbRbXVO9J7X6duFcOjY3HUNCxg5D0ZCK7EP9vIcZ0zRpLU9bWkyCqVCLZ46IbQ==", + "version": "1.12.5", + "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.12.5.tgz", + "integrity": "sha512-d3iiHxdpg5+ZcJ6jnDSOT8Z0O0VMVGy34jAnYLUX8yd36b1qn8f1TwOA/Lc7TsOh03IkPJ38eGI5qD2EjNkoEA==", + "license": "Apache-2.0", "dependencies": { "@grpc/proto-loader": "^0.7.13", "@js-sdsl/ordered-map": "^4.4.2" diff --git a/package.json b/package.json index 7676e587..f2a9aa7d 100644 --- a/package.json +++ b/package.json @@ -142,7 +142,7 @@ "win-ca": "3.5.1" }, "dependencies": { - "@grpc/grpc-js": "1.10.9", + "@grpc/grpc-js": "1.12.5", "@grpc/proto-loader": "0.7.13", "@types/form-data": "^2.2.1", "chalk": "^2.4.2", diff --git a/src/__tests__/modeler/modeler-integration.spec.ts b/src/__tests__/modeler/modeler-integration.spec.ts index 43f58022..328c6949 100644 --- a/src/__tests__/modeler/modeler-integration.spec.ts +++ b/src/__tests__/modeler/modeler-integration.spec.ts @@ -2,6 +2,7 @@ import fs from 'fs' import { ModelerApiClient } from '../../modeler' +jest.setTimeout(10000) describe('ModelerApiClient', () => { let modeler: ModelerApiClient diff --git a/src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts b/src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts index e3f7827e..46ad4552 100644 --- a/src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts +++ b/src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts @@ -1,7 +1,15 @@ +import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' import { TasklistApiClient } from '../../../tasklist' import { ZeebeGrpcClient, ZeebeRestClient } from '../../../zeebe' jest.setTimeout(30000) + +suppressZeebeLogging() + +afterAll(() => { + restoreZeebeLogging() +}) + test('can update a task', async () => { const grpc = new ZeebeGrpcClient() diff --git a/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts b/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts index 12b53ddd..f39f2c35 100644 --- a/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts +++ b/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts @@ -17,14 +17,10 @@ beforeEach(() => { }) afterEach(async () => { - try { - if (wf?.processInstanceKey) { - await zbc.cancelProcessInstance(wf.processInstanceKey) - } - } catch (e: unknown) { - } finally { - await zbc.close() // Makes sure we don't forget to close connection + if (wf?.processInstanceKey) { + await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e) } + await zbc.close() // Makes sure we don't forget to close connection }) test('can update Job Timeout', async () => { diff --git a/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts index 27ec4f58..fe43c242 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts @@ -17,14 +17,10 @@ beforeEach(() => { }) afterEach(async () => { - try { - if (wf?.processInstanceKey) { - await zbc.cancelProcessInstance(wf.processInstanceKey) - } - } catch (e: unknown) { - } finally { - await zbc.close() // Makes sure we don't forget to close connection + if (wf?.processInstanceKey) { + await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e) } + await zbc.close() // Makes sure we don't forget to close connection }) test('Decrements the retries count by default', async () => { diff --git a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts index 041c7463..c8d17757 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts @@ -53,11 +53,8 @@ beforeAll(async () => { }) afterEach(async () => { - try { - if (wf?.processInstanceKey) { - await zbc.cancelProcessInstance(wf.processInstanceKey) - } - } catch (e: unknown) { + if (wf?.processInstanceKey) { + await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e) } }) diff --git a/src/lib/Configuration.ts b/src/lib/Configuration.ts index 33fcf565..07163d5b 100644 --- a/src/lib/Configuration.ts +++ b/src/lib/Configuration.ts @@ -355,7 +355,7 @@ const getZeebeEnv = () => ZEEBE_GRPC_WORKER_LONGPOLL_SECONDS: { type: 'number', optional: true, - default: 60, + default: 50, // At 60 seconds, SaaS will throw a 504 Gateway Timeout }, /** After a long poll Job Activation request, this is the cool-off period in milliseconds before the worker requests more work. Defaults to 300 */ ZEEBE_GRPC_WORKER_POLL_INTERVAL_MS: { diff --git a/src/zeebe/lib/GrpcClient.ts b/src/zeebe/lib/GrpcClient.ts index 57a92df3..b3f0ba76 100644 --- a/src/zeebe/lib/GrpcClient.ts +++ b/src/zeebe/lib/GrpcClient.ts @@ -339,17 +339,38 @@ export class GrpcClient extends EventEmitter { } } + // Free the stream resources. When it emits 'end', we remove all listeners and destroy it. + stream.on('end', () => { + stream.removeAllListeners() + stream.destroy() + }) + // This deals with the case where during a broker restart the call returns a stream // but that stream is not a legit Gateway activation. In that case, the Gateway will // never time out or close the stream. So we have to manage that case. + // Also, no end event is emitted when there are no jobs, so we have to emit it ourselves. const clientsideTimeoutDuration = - Duration.milliseconds.from(this.longPoll!) + 1000 - const clientSideTimeout = setTimeout(() => { - debug( - `Triggered client-side timeout after ${clientsideTimeoutDuration}ms` - ) - stream.emit('end') - }, clientsideTimeoutDuration) + Duration.milliseconds.from( + timeNormalisedRequest.requestTimeout ?? this.longPoll! + ) + 1000 + + // We will not trigger a client-side timeout if the requestTimeout was set to 0 or -1, as we don't know how long to wait + // This could result in resource leaks when using a worker with a longPoll set to -1 or 0. + const clientSideTimeout = + timeNormalisedRequest.requestTimeout !== 0 && + timeNormalisedRequest.requestTimeout !== -1 + ? setTimeout(() => { + if ( + clientsideTimeoutDuration !== 1000 && + clientsideTimeoutDuration !== 999 + ) { + debug( + `Triggered client-side timeout after ${clientsideTimeoutDuration}ms` + ) + stream.emit('end') + } + }, clientsideTimeoutDuration) + : 0 /** * Once this gets attached here, it is attached to *all* calls diff --git a/src/zeebe/lib/ZBWorkerBase.ts b/src/zeebe/lib/ZBWorkerBase.ts index 0ec7da80..23c5ff67 100644 --- a/src/zeebe/lib/ZBWorkerBase.ts +++ b/src/zeebe/lib/ZBWorkerBase.ts @@ -203,7 +203,11 @@ export class ZBWorkerBase< () => this.poll(), Duration.milliseconds.from(this.pollInterval) ) - debug(`Created worker for task type ${taskType}`) + debug( + `Created worker for task type ${taskType} - polling interval ${Duration.milliseconds.from( + this.pollInterval + )}` + ) } /** @@ -551,7 +555,7 @@ You should call only one job action method in the worker handler. This is a bug const amount = this.maxJobsToActivate - this.activeJobs - const requestTimeout = this.longPoll || -1 + const requestTimeout = this.longPoll ?? -1 const activateJobsRequest: ActivateJobsRequest = { maxJobsToActivate: amount, diff --git a/src/zeebe/lib/interfaces-1.0.ts b/src/zeebe/lib/interfaces-1.0.ts index b3070467..979bd692 100644 --- a/src/zeebe/lib/interfaces-1.0.ts +++ b/src/zeebe/lib/interfaces-1.0.ts @@ -502,7 +502,7 @@ export interface ZBGrpc extends GrpcClient { completeJobSync: (req: CompleteJobRequest) => Promise activateJobsStream: ( req: ActivateJobsRequest - ) => ClientReadableStream + ) => Promise> publishMessageSync( publishMessageRequest: PublishMessageRequest ): Promise