Skip to content

Commit

Permalink
Merge pull request #282 from camunda/fix-229
Browse files Browse the repository at this point in the history
fix(zeebe): update JS GPRC lib dependency, release stream resources on end.

Fixes #229
Fixes #281
  • Loading branch information
jwulf authored Dec 19, 2024
2 parents c427878 + d4b6773 commit c3cfb18
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 37 deletions.
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ For `int64` values whose type is not known ahead of time, such as job variables,

Calls to APIs can be authorized using basic auth or via OAuth - a token that is obtained via a client id/secret pair exchange.

### Disable Auth
### Disable OAuth

To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example.
To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example. You can also use this when your authentication is being done using an x509 mTLS certificate.

### Basic Auth

Expand All @@ -79,6 +79,15 @@ CAMUNDA_BASIC_AUTH_USERNAME=....
CAMUNDA_BASIC_AUTH_PASSWORD=...
```

### Bearer Token Auth

To use a Bearer token that you have already obtained, set the following value:

```bash
CAMUNDA_AUTH_STRATEGY=BEARER
CAMUNDA_OAUTH_TOKEN=....
```

### OAuth

If your platform is secured with OAuth token exchange (Camunda SaaS or Self-Managed with Identity), provide the following configuration fields at a minimum, either via the `Camunda8` constructor or in environment variables:
Expand Down
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
"win-ca": "3.5.1"
},
"dependencies": {
"@grpc/grpc-js": "1.10.9",
"@grpc/grpc-js": "1.12.5",
"@grpc/proto-loader": "0.7.13",
"@types/form-data": "^2.2.1",
"chalk": "^2.4.2",
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/modeler/modeler-integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from 'fs'

import { ModelerApiClient } from '../../modeler'

jest.setTimeout(10000)
describe('ModelerApiClient', () => {
let modeler: ModelerApiClient

Expand Down
8 changes: 8 additions & 0 deletions src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { TasklistApiClient } from '../../../tasklist'
import { ZeebeGrpcClient, ZeebeRestClient } from '../../../zeebe'

jest.setTimeout(30000)

suppressZeebeLogging()

afterAll(() => {
restoreZeebeLogging()
})

test('can update a task', async () => {
const grpc = new ZeebeGrpcClient()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ beforeEach(() => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
} finally {
await zbc.close() // Makes sure we don't forget to close connection
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
await zbc.close() // Makes sure we don't forget to close connection
})

test('can update Job Timeout', async () => {
Expand Down
10 changes: 3 additions & 7 deletions src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ beforeEach(() => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
} finally {
await zbc.close() // Makes sure we don't forget to close connection
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
await zbc.close() // Makes sure we don't forget to close connection
})

test('Decrements the retries count by default', async () => {
Expand Down
7 changes: 2 additions & 5 deletions src/__tests__/zeebe/integration/Worker-Failure.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ beforeAll(async () => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
})

Expand Down
2 changes: 1 addition & 1 deletion src/lib/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ const getZeebeEnv = () =>
ZEEBE_GRPC_WORKER_LONGPOLL_SECONDS: {
type: 'number',
optional: true,
default: 60,
default: 50, // At 60 seconds, SaaS will throw a 504 Gateway Timeout
},
/** After a long poll Job Activation request, this is the cool-off period in milliseconds before the worker requests more work. Defaults to 300 */
ZEEBE_GRPC_WORKER_POLL_INTERVAL_MS: {
Expand Down
35 changes: 28 additions & 7 deletions src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,38 @@ export class GrpcClient extends EventEmitter {
}
}

// Free the stream resources. When it emits 'end', we remove all listeners and destroy it.
stream.on('end', () => {
stream.removeAllListeners()
stream.destroy()
})

// This deals with the case where during a broker restart the call returns a stream
// but that stream is not a legit Gateway activation. In that case, the Gateway will
// never time out or close the stream. So we have to manage that case.
// Also, no end event is emitted when there are no jobs, so we have to emit it ourselves.
const clientsideTimeoutDuration =
Duration.milliseconds.from(this.longPoll!) + 1000
const clientSideTimeout = setTimeout(() => {
debug(
`Triggered client-side timeout after ${clientsideTimeoutDuration}ms`
)
stream.emit('end')
}, clientsideTimeoutDuration)
Duration.milliseconds.from(
timeNormalisedRequest.requestTimeout ?? this.longPoll!
) + 1000

// We will not trigger a client-side timeout if the requestTimeout was set to 0 or -1, as we don't know how long to wait
// This could result in resource leaks when using a worker with a longPoll set to -1 or 0.
const clientSideTimeout =
timeNormalisedRequest.requestTimeout !== 0 &&
timeNormalisedRequest.requestTimeout !== -1
? setTimeout(() => {
if (
clientsideTimeoutDuration !== 1000 &&
clientsideTimeoutDuration !== 999
) {
debug(
`Triggered client-side timeout after ${clientsideTimeoutDuration}ms`
)
stream.emit('end')
}
}, clientsideTimeoutDuration)
: 0

/**
* Once this gets attached here, it is attached to *all* calls
Expand Down
8 changes: 6 additions & 2 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,11 @@ export class ZBWorkerBase<
() => this.poll(),
Duration.milliseconds.from(this.pollInterval)
)
debug(`Created worker for task type ${taskType}`)
debug(
`Created worker for task type ${taskType} - polling interval ${Duration.milliseconds.from(
this.pollInterval
)}`
)
}

/**
Expand Down Expand Up @@ -551,7 +555,7 @@ You should call only one job action method in the worker handler. This is a bug

const amount = this.maxJobsToActivate - this.activeJobs

const requestTimeout = this.longPoll || -1
const requestTimeout = this.longPoll ?? -1

const activateJobsRequest: ActivateJobsRequest = {
maxJobsToActivate: amount,
Expand Down
2 changes: 1 addition & 1 deletion src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ export interface ZBGrpc extends GrpcClient {
completeJobSync: (req: CompleteJobRequest) => Promise<void>
activateJobsStream: (
req: ActivateJobsRequest
) => ClientReadableStream<unknown>
) => Promise<ClientReadableStream<unknown>>
publishMessageSync(
publishMessageRequest: PublishMessageRequest
): Promise<PublishMessageResponse>
Expand Down

0 comments on commit c3cfb18

Please sign in to comment.