Skip to content

Commit

Permalink
feat(camunda8): implement deployResources REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
jwulf committed Sep 17, 2024
1 parent 7e28fe5 commit 6ee8544
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 232 deletions.
523 changes: 339 additions & 184 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
"dayjs": "^1.8.15",
"debug": "^4.3.4",
"fast-xml-parser": "^4.1.3",
"form-data": "^4.0.0",
"got": "^11.8.6",
"jwt-decode": "^4.0.0",
"lodash.mergewith": "^4.6.2",
Expand All @@ -159,4 +160,4 @@
"typed-duration": "^1.0.12",
"uuid": "^7.0.3"
}
}
}
23 changes: 13 additions & 10 deletions src/__tests__/c8/rest/activateJobs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ const grpcClient = new ZeebeGrpcClient({
const restClient = new C8RestClient()

beforeAll(async () => {
res = await grpcClient.deployResource({
processFilename: './src/__tests__/testdata/hello-world-complete.bpmn',
})
res = (await restClient.deployResourcesFromFiles([
'./src/__tests__/testdata/hello-world-complete.bpmn',
])) as unknown as DeployResourceResponse<ProcessDeployment>
// res = await grpcClient.deployResource({
// processFilename: './src/__tests__/testdata/hello-world-complete.bpmn',
// })
console.log(res)
bpmnProcessId = res.deployments[0].process.bpmnProcessId
})

Expand All @@ -29,10 +33,11 @@ test('Can service a task', (done) => {
grpcClient
.createProcessInstance({
bpmnProcessId,
variables: {},
variables: {
someNumberField: 8,
},
})
.then((r) => {
console.log(r)
.then(() => {
restClient
.activateJobs({
maxJobsToActivate: 2,
Expand All @@ -43,10 +48,8 @@ test('Can service a task', (done) => {
})
.then((jobs) => {
expect(jobs.length).toBe(1)
console.log(jobs)
const res = jobs.map((job) => job.complete())
console.log(res)
Promise.all(res).then(() => done())

jobs[0].complete().then(() => done())
})
})
})
1 change: 0 additions & 1 deletion src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ export const cleanUp = async () => {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log(e)
if (!(e as Error).message.startsWith('5 NOT_FOUND')) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
Expand Down
32 changes: 32 additions & 0 deletions src/__tests__/lib/LosslessJsonParser.unit.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,35 @@ test('LosslessJsonParser handles subkeys', () => {
console.log(parsed)
expect(parsed[0].key).toBe(2251799813737371)
})

test('LosslessJsonParser will throw if given stringified JSON with an unsafe integer number', () => {
let threw = false
const json = `{"unsafeNumber": 9223372036854775808}` // Unsafe integer (greater than Int64 max)

try {
losslessParse(json) // Attempt to parse un-mapped JSON directly
} catch (e) {
threw = true
expect((e as Error).message.includes('unsafe number value')).toBe(true)
}

expect(threw).toBe(true)
})

test('LosslessJsonParser will throw if given stringified JSON with an unsafe integer number, even with a Dto', () => {
let threw = false
const json = `{"unsafeNumber": 9223372036854775808}` // Unsafe integer (greater than Int64 max)

class Dto extends LosslessDto {
unsafeNumber!: number
}

try {
losslessParse(json, Dto) // Attempt to parse mapped JSON without a mapping
} catch (e) {
threw = true
expect((e as Error).message.includes('unsafe number value')).toBe(true)
}

expect(threw).toBe(true)
})
2 changes: 1 addition & 1 deletion src/__tests__/zeebe/integration/Client-integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ test("does not retry to cancel a process instance that doesn't exist", async ()
// See: https://github.com/zeebe-io/zeebe/issues/2680
// await zbc.cancelProcessInstance('123LoL')
try {
await zbc.cancelProcessInstance(2251799813686202)
await zbc.cancelProcessInstance('2251799813686202')
} catch (e: unknown) {
expect((e as Error).message.indexOf('5 NOT_FOUND:')).toBe(0)
}
Expand Down
10 changes: 4 additions & 6 deletions src/c8/lib/C8Dto.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { LosslessNumber } from 'lossless-json'

import { Int64String, LosslessDto } from '../../lib'

export class RestApiJob<
Expand All @@ -10,19 +8,19 @@ export class RestApiJob<
key!: string
type!: string
@Int64String
processInstanceKey!: LosslessNumber
processInstanceKey!: string
bpmnProcessId!: string
processDefinitionVersion!: number
@Int64String
processDefinitionKey!: LosslessNumber
processDefinitionKey!: string
elementId!: string
@Int64String
elementInstanceKey!: LosslessNumber
elementInstanceKey!: string
customHeaders!: CustomHeaders
worker!: string
retries!: number
@Int64String
deadline!: LosslessNumber
deadline!: string
variables!: Variables
tenantId!: string
}
Expand Down
72 changes: 64 additions & 8 deletions src/c8/lib/C8RestClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import fs from 'node:fs'

import { debug } from 'debug'
import FormData from 'form-data'
import got from 'got'

import {
Expand Down Expand Up @@ -86,8 +89,6 @@ export class C8RestClient {
},
})
)

// this.tenantId = config.CAMUNDA_TENANT_ID
}

private async getHeaders() {
Expand Down Expand Up @@ -285,8 +286,8 @@ export class C8RestClient {
headers,
parseJson: (text) => losslessParse(text, jobDto, 'jobs'),
})
.json<{ jobs: Job<VariablesDto, CustomHeadersDto>[] }>()
.then((activatedJobs) => activatedJobs.jobs.map(this.addJobMethods))
.json<Job<VariablesDto, CustomHeadersDto>[]>()
.then((activatedJobs) => activatedJobs.map(this.addJobMethods))
)
}

Expand All @@ -301,7 +302,7 @@ export class C8RestClient {

return this.rest.then((rest) =>
rest
.post(`jobs/${jobKey}/fail`, {
.post(`jobs/${jobKey}/failure`, {
body: losslessStringify(failJobRequest),
headers,
})
Expand All @@ -325,6 +326,7 @@ export class C8RestClient {
.post(`jobs/${jobKey}/error`, {
body: losslessStringify(request),
headers,
parseJson: (text) => losslessParse(text),
})
.then(() => JOB_ACTION_ACKNOWLEDGEMENT)
)
Expand All @@ -341,7 +343,7 @@ export class C8RestClient {

return this.rest.then((rest) =>
rest
.post(`jobs/${jobKey}/complete`, {
.post(`jobs/${jobKey}/completion`, {
body: losslessStringify({ variables: completeJobRequest.variables }),
headers,
})
Expand All @@ -367,13 +369,13 @@ export class C8RestClient {
}

/**
* Marks the incident as resolved; most likely a call to Update job will be necessary to reset the jobs retries, followed by this call.
* Marks the incident as resolved; most likely a call to Update job will be necessary to reset the job's retries, followed by this call.
*/
public async resolveIncident(incidentKey: string) {
const headers = await this.getHeaders()

return this.rest.then((rest) =>
rest.post(`incidents/${incidentKey}/resolve`, {
rest.post(`incidents/${incidentKey}/resolution`, {
headers,
})
)
Expand Down Expand Up @@ -413,6 +415,60 @@ export class C8RestClient {
)
}

/**
* Deploy resources to the broker
*/
/**
* Deploy resources to the broker.
* @param resources - An array of binary data buffers representing the resources to deploy.
* @param tenantId - Optional tenant ID to deploy the resources to. If not provided, the default tenant ID is used.
*/
public async deployResources(
resources: { content: string; name: string }[],
tenantId?: string
) {
const headers = await this.getHeaders()
const formData = new FormData()

resources.forEach((resource) => {
formData.append(`resources`, resource.content, {
filename: resource.name,
})
})

if (tenantId || this.tenantId) {
formData.append('tenantId', tenantId ?? this.tenantId)
}

return this.rest.then((rest) =>
rest
.post('deployments', {
body: formData,
headers: {
...headers,
...formData.getHeaders(),
Accept: 'application/json',
},
parseJson: (text) => losslessParse(text),
})
.json()
)
}

public async deployResourcesFromFiles(filenames: string[]) {
const resources: { content: string; name: string }[] = []

for (const filename of filenames) {
// const resource = await fs.promises.readFile(filename)
resources.push({
content: fs.readFileSync(filename, { encoding: 'binary' }),
name: filename,
})
}

return this.deployResources(resources)
}

private addJobMethods = <Variables, CustomHeaders>(
job: Job<Variables, CustomHeaders>
): Job<Variables, CustomHeaders> &
Expand Down
Loading

0 comments on commit 6ee8544

Please sign in to comment.