Skip to content

Commit

Permalink
feat(camunda8): complete deployResources feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jwulf committed Sep 18, 2024
1 parent 6ee8544 commit 36ace59
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 38 deletions.
28 changes: 9 additions & 19 deletions src/__tests__/c8/rest/activateJobs.spec.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,27 @@
import path from 'node:path'

import { DeployResourceResponse } from '../../../c8/lib/C8Dto'
import { C8RestClient } from '../../../c8/lib/C8RestClient'
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { DeployResourceResponse, ProcessDeployment } from '../../../zeebe/types'

suppressZeebeLogging()
let res: DeployResourceResponse<ProcessDeployment>
let res: DeployResourceResponse
let bpmnProcessId: string
const grpcClient = new ZeebeGrpcClient({
config: {
CAMUNDA_TENANT_ID: '<default>',
},
})
const restClient = new C8RestClient()

beforeAll(async () => {
res = (await restClient.deployResourcesFromFiles([
'./src/__tests__/testdata/hello-world-complete.bpmn',
])) as unknown as DeployResourceResponse<ProcessDeployment>
// res = await grpcClient.deployResource({
// processFilename: './src/__tests__/testdata/hello-world-complete.bpmn',
// })
console.log(res)
bpmnProcessId = res.deployments[0].process.bpmnProcessId
res = await restClient.deployResourcesFromFiles([
path.join('.', 'src', '__tests__', 'testdata', 'hello-world-complete.bpmn'),
])
bpmnProcessId = res.processes[0].bpmnProcessId
})

afterAll(async () => {
restoreZeebeLogging()
await grpcClient.close()
})

test('Can service a task', (done) => {
grpcClient
restClient
.createProcessInstance({
bpmnProcessId,
variables: {
Expand All @@ -48,7 +39,6 @@ test('Can service a task', (done) => {
})
.then((jobs) => {
expect(jobs.length).toBe(1)

jobs[0].complete().then(() => done())
})
})
Expand Down
6 changes: 3 additions & 3 deletions src/__tests__/oauth/OAuthProvider.unit.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ describe('OAuthProvider', () => {
})

it('Uses form encoding for request', (done) => {
const serverPort3001 = 3001
const serverPort3010 = 3010
const o = new OAuthProvider({
config: {
CAMUNDA_ZEEBE_OAUTH_AUDIENCE: 'token',
ZEEBE_CLIENT_ID: 'clientId8',
ZEEBE_CLIENT_SECRET: 'clientSecret',
CAMUNDA_OAUTH_URL: `http://127.0.0.1:${serverPort3001}`,
CAMUNDA_OAUTH_URL: `http://127.0.0.1:${serverPort3010}`,
},
})
const secret = 'YOUR_SECRET'
Expand All @@ -281,7 +281,7 @@ describe('OAuthProvider', () => {
})
}
})
.listen(serverPort3001)
.listen(serverPort3010)
o.getToken('OPERATE')
})

Expand Down
80 changes: 80 additions & 0 deletions src/c8/lib/C8Dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,83 @@ export interface NewUserInfo {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Ctor<T> = new (obj: any) => T

export class ProcessDeployment extends LosslessDto {
bpmnProcessId!: string
version!: number
@Int64String
processDefinitionKey!: string
resourceName!: string
tenantId!: string
}

export class DecisionDeployment extends LosslessDto {
dmnDecisionId!: string
version!: number
@Int64String
decisionKey!: string
dmnDecisionName!: string
tenantId!: string
dmnDecisionRequirementsId!: string
@Int64String
dmnDecisionRequirementsKey!: string
}

export class DecisionRequirementsDeployment extends LosslessDto {
dmnDecisionRequirementsId!: string
version!: number
dmnDecisionRequirementsName!: string
tenantId!: string
@Int64String
dmnDecisionRequirementsKey!: string
resourceName!: string
}
export class FormDeployment {
formId!: string
version!: number
@Int64String
formKey!: string
resourceName!: string
tenantId!: string
}

export class DeployResourceResponseDto extends LosslessDto {
@Int64String
key!: string
deployments!: (
| { process: ProcessDeployment }
| { decision: DecisionDeployment }
| { decisionRequirements: DecisionRequirementsDeployment }
| { form: FormDeployment }
)[]
tenantId!: string
}

export class DeployResourceResponse extends DeployResourceResponseDto {
processes!: ProcessDeployment[]
decisions!: DecisionDeployment[]
decisionRequirements!: DecisionRequirementsDeployment[]
forms!: FormDeployment[]
}

export class CreateProcessInstanceResponse extends LosslessDto {
/**
* The unique key identifying the process definition (e.g. returned from a process
* in the DeployResourceResponse message)
*/
readonly processDefinitionKey!: string
/**
* The BPMN process ID of the process definition
*/
readonly bpmnProcessId!: string
/**
* The version of the process; set to -1 to use the latest version
*/
readonly version!: number
@Int64String
readonly processInstanceKey!: string
/**
* the tenant identifier of the created process instance
*/
readonly tenantId!: string
}
133 changes: 117 additions & 16 deletions src/c8/lib/C8RestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import fs from 'node:fs'
import { debug } from 'debug'
import FormData from 'form-data'
import got from 'got'
import { parse, stringify } from 'lossless-json'

import {
CamundaEnvironmentConfigurator,
Expand All @@ -24,19 +25,32 @@ import { IOAuthProvider } from '../../oauth'
import {
ActivateJobsRequest,
CompleteJobRequest,
CreateProcessInstanceRequest,
CreateProcessInstanceReq,
ErrorJobWithVariables,
FailJobRequest,
IProcessVariables,
Job,
JOB_ACTION_ACKNOWLEDGEMENT,
JobCompletionInterfaceRest,
JSONDoc,
PublishMessageRequest,
PublishMessageResponse,
TopologyResponse,
} from '../../zeebe/types'

import { Ctor, JobUpdateChangeset, NewUserInfo, TaskChangeSet } from './C8Dto'
import {
CreateProcessInstanceResponse,
Ctor,
DecisionDeployment,
DecisionRequirementsDeployment,
DeployResourceResponse,
DeployResourceResponseDto,
FormDeployment,
JobUpdateChangeset,
NewUserInfo,
ProcessDeployment,
TaskChangeSet,
} from './C8Dto'
import { createSpecializedRestApiJobClass } from './RestApiJobClassFactory'

const trace = debug('camunda:zeebe')
Expand Down Expand Up @@ -384,14 +398,20 @@ export class C8RestClient {
/**
* Create and start a process instance
*/
public async createProcessInstance(request: CreateProcessInstanceRequest) {
public async createProcessInstance<T extends JSONDoc>(
request: CreateProcessInstanceReq<T>
) {
const headers = await this.getHeaders()

return this.rest.then((rest) =>
rest.post(`process-instances`, {
body: losslessStringify(this.addDefaultTenantId(request)),
headers,
})
rest
.post(`process-instances`, {
body: losslessStringify(this.addDefaultTenantId(request)),
headers,
parseJson: (text) =>
losslessParse(text, CreateProcessInstanceResponse),
})
.json<CreateProcessInstanceResponse>()
)
}

Expand Down Expand Up @@ -420,7 +440,7 @@ export class C8RestClient {
*/
/**
* Deploy resources to the broker.
* @param resources - An array of binary data buffers representing the resources to deploy.
* @param resources - An array of binary data strings representing the resources to deploy.
* @param tenantId - Optional tenant ID to deploy the resources to. If not provided, the default tenant ID is used.
*/
public async deployResources(
Expand All @@ -440,7 +460,7 @@ export class C8RestClient {
formData.append('tenantId', tenantId ?? this.tenantId)
}

return this.rest.then((rest) =>
const res = await this.rest.then((rest) =>
rest
.post('deployments', {
body: formData,
Expand All @@ -449,20 +469,101 @@ export class C8RestClient {
...formData.getHeaders(),
Accept: 'application/json',
},
parseJson: (text) => losslessParse(text),
parseJson: (text) => parse(text), // we parse the response with LosslessNumbers, with no Dto
})
.json()
.json<DeployResourceResponseDto>()
)

/**
* Now we need to examine the response and parse the deployments to lossless Dtos
* We dynamically construct the response object for the caller, by examining the lossless response
* and re-parsing each of the deployments with the correct Dto.
*/
const deploymentResponse = new DeployResourceResponse({
key: res.key.toString(),
tenantId: res.tenantId,
deployments: [],
processes: [],
decisions: [],
decisionRequirements: [],
forms: [],
})

/**
* Type-guard assertions to correctly type the deployments. The API returns an array with mixed types.
*/
const isProcessDeployment = (
deployment
): deployment is { process: ProcessDeployment } => !!deployment.process
const isDecisionDeployment = (
deployment
): deployment is { decision: DecisionDeployment } => !!deployment.decision
const isDecisionRequirementsDeployment = (
deployment
): deployment is { decisionRequirements: DecisionRequirementsDeployment } =>
!!deployment.decisionRequirements
const isFormDeployment = (
deployment
): deployment is { form: FormDeployment } => !!deployment.form

/**
* Here we examine each of the deployments returned from the API, and create a correctly typed
* object for each one. We also populate subkeys per type. This allows SDK users to work with
* types known ahead of time.
*/
res.deployments.forEach((deployment) => {
if (isProcessDeployment(deployment)) {
const processDeployment = losslessParse(
stringify(deployment.process)!,
ProcessDeployment
)
deploymentResponse.deployments.push({ process: processDeployment })
deploymentResponse.processes.push(processDeployment)
}
if (isDecisionDeployment(deployment)) {
const decisionDeployment = losslessParse(
stringify(deployment)!,
DecisionDeployment
)
deploymentResponse.deployments.push({ decision: decisionDeployment })
deploymentResponse.decisions.push(decisionDeployment)
}
if (isDecisionRequirementsDeployment(deployment)) {
const decisionRequirementsDeployment = losslessParse(
stringify(deployment)!,
DecisionRequirementsDeployment
)
deploymentResponse.deployments.push({
decisionRequirements: decisionRequirementsDeployment,
})
deploymentResponse.decisionRequirements.push(
decisionRequirementsDeployment
)
}
if (isFormDeployment(deployment)) {
const formDeployment = losslessParse(
stringify(deployment)!,
FormDeployment
)
deploymentResponse.deployments.push({ form: formDeployment })
deploymentResponse.forms.push(formDeployment)
}
})

return deploymentResponse
}

public async deployResourcesFromFiles(filenames: string[]) {
/**
* Deploy resources to Camunda 8 from files
* @param files an array of file paths
*/
public async deployResourcesFromFiles(files: string[]) {
const resources: { content: string; name: string }[] = []

for (const filename of filenames) {
// const resource = await fs.promises.readFile(filename)
for (const file of files) {
resources.push({
content: fs.readFileSync(filename, { encoding: 'binary' }),
name: filename,
content: fs.readFileSync(file, { encoding: 'binary' }),
name: file,
})
}

Expand Down

0 comments on commit 36ace59

Please sign in to comment.