Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpha #283

Merged
merged 3 commits into from
Dec 19, 2024
Merged

Alpha #283

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ For `int64` values whose type is not known ahead of time, such as job variables,

Calls to APIs can be authorized using basic auth or via OAuth - a token that is obtained via a client id/secret pair exchange.

### Disable Auth
### Disable OAuth

To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example.
To disable OAuth, set the environment variable `CAMUNDA_OAUTH_DISABLED=true`. You can use this when running against a minimal Zeebe broker in a development environment, for example. You can also use this when your authentication is being done using an x509 mTLS certificate.

### Basic Auth

Expand All @@ -79,6 +79,15 @@ CAMUNDA_BASIC_AUTH_USERNAME=....
CAMUNDA_BASIC_AUTH_PASSWORD=...
```

### Bearer Token Auth

To use a Bearer token that you have already obtained, set the following value:

```bash
CAMUNDA_AUTH_STRATEGY=BEARER
CAMUNDA_OAUTH_TOKEN=....
```

### OAuth

If your platform is secured with OAuth token exchange (Camunda SaaS or Self-Managed with Identity), provide the following configuration fields at a minimum, either via the `Camunda8` constructor or in environment variables:
Expand Down
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
"win-ca": "3.5.1"
},
"dependencies": {
"@grpc/grpc-js": "1.10.9",
"@grpc/grpc-js": "1.12.5",
"@grpc/proto-loader": "0.7.13",
"@types/form-data": "^2.2.1",
"chalk": "^2.4.2",
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/modeler/modeler-integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from 'fs'

import { ModelerApiClient } from '../../modeler'

jest.setTimeout(10000)
describe('ModelerApiClient', () => {
let modeler: ModelerApiClient

Expand Down
8 changes: 8 additions & 0 deletions src/__tests__/zeebe/integration-rest/UserTask-rest.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { TasklistApiClient } from '../../../tasklist'
import { ZeebeGrpcClient, ZeebeRestClient } from '../../../zeebe'

jest.setTimeout(30000)

suppressZeebeLogging()

afterAll(() => {
restoreZeebeLogging()
})

test('can update a task', async () => {
const grpc = new ZeebeGrpcClient()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ beforeEach(() => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
} finally {
await zbc.close() // Makes sure we don't forget to close connection
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
await zbc.close() // Makes sure we don't forget to close connection
})

test('can update Job Timeout', async () => {
Expand Down
10 changes: 3 additions & 7 deletions src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ beforeEach(() => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
} finally {
await zbc.close() // Makes sure we don't forget to close connection
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
await zbc.close() // Makes sure we don't forget to close connection
})

test('Decrements the retries count by default', async () => {
Expand Down
7 changes: 2 additions & 5 deletions src/__tests__/zeebe/integration/Worker-Failure.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ beforeAll(async () => {
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
})

Expand Down
2 changes: 1 addition & 1 deletion src/lib/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ const getZeebeEnv = () =>
ZEEBE_GRPC_WORKER_LONGPOLL_SECONDS: {
type: 'number',
optional: true,
default: 60,
default: 50, // At 60 seconds, SaaS will throw a 504 Gateway Timeout
},
/** After a long poll Job Activation request, this is the cool-off period in milliseconds before the worker requests more work. Defaults to 300 */
ZEEBE_GRPC_WORKER_POLL_INTERVAL_MS: {
Expand Down
35 changes: 28 additions & 7 deletions src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,38 @@ export class GrpcClient extends EventEmitter {
}
}

// Free the stream resources. When it emits 'end', we remove all listeners and destroy it.
stream.on('end', () => {
stream.removeAllListeners()
stream.destroy()
})

// This deals with the case where during a broker restart the call returns a stream
// but that stream is not a legit Gateway activation. In that case, the Gateway will
// never time out or close the stream. So we have to manage that case.
// Also, no end event is emitted when there are no jobs, so we have to emit it ourselves.
const clientsideTimeoutDuration =
Duration.milliseconds.from(this.longPoll!) + 1000
const clientSideTimeout = setTimeout(() => {
debug(
`Triggered client-side timeout after ${clientsideTimeoutDuration}ms`
)
stream.emit('end')
}, clientsideTimeoutDuration)
Duration.milliseconds.from(
timeNormalisedRequest.requestTimeout ?? this.longPoll!
) + 1000

// We will not trigger a client-side timeout if the requestTimeout was set to 0 or -1, as we don't know how long to wait
// This could result in resource leaks when using a worker with a longPoll set to -1 or 0.
const clientSideTimeout =
timeNormalisedRequest.requestTimeout !== 0 &&
timeNormalisedRequest.requestTimeout !== -1
? setTimeout(() => {
if (
clientsideTimeoutDuration !== 1000 &&
clientsideTimeoutDuration !== 999
) {
debug(
`Triggered client-side timeout after ${clientsideTimeoutDuration}ms`
)
stream.emit('end')
}
}, clientsideTimeoutDuration)
: 0

/**
* Once this gets attached here, it is attached to *all* calls
Expand Down
8 changes: 6 additions & 2 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,11 @@ export class ZBWorkerBase<
() => this.poll(),
Duration.milliseconds.from(this.pollInterval)
)
debug(`Created worker for task type ${taskType}`)
debug(
`Created worker for task type ${taskType} - polling interval ${Duration.milliseconds.from(
this.pollInterval
)}`
)
}

/**
Expand Down Expand Up @@ -551,7 +555,7 @@ You should call only one job action method in the worker handler. This is a bug

const amount = this.maxJobsToActivate - this.activeJobs

const requestTimeout = this.longPoll || -1
const requestTimeout = this.longPoll ?? -1

const activateJobsRequest: ActivateJobsRequest = {
maxJobsToActivate: amount,
Expand Down
2 changes: 1 addition & 1 deletion src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ export interface ZBGrpc extends GrpcClient {
completeJobSync: (req: CompleteJobRequest) => Promise<void>
activateJobsStream: (
req: ActivateJobsRequest
) => ClientReadableStream<unknown>
) => Promise<ClientReadableStream<unknown>>
publishMessageSync(
publishMessageRequest: PublishMessageRequest
): Promise<PublishMessageResponse>
Expand Down
Loading