Skip to content

Commit

Permalink
feat(zeebe): support Job Corrections in complete command fixes #347
Browse files Browse the repository at this point in the history
  • Loading branch information
jwulf committed Jan 3, 2025
1 parent 37de21b commit edaddd8
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 3 deletions.
4 changes: 2 additions & 2 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# CAMUNDA_CONNECTORS_VERSION=0.23.2
CAMUNDA_CONNECTORS_VERSION=8.5.0
CAMUNDA_OPTIMIZE_VERSION=8.5.0
CAMUNDA_PLATFORM_VERSION=8.6.0
CAMUNDA_ZEEBE_VERSION=8.6.3
CAMUNDA_PLATFORM_VERSION=8.7.0-alpha2
CAMUNDA_ZEEBE_VERSION=8.7.0-alpha2
CAMUNDA_WEB_MODELER_VERSION=8.5.0
ELASTIC_VERSION=8.9.0
KEYCLOAK_SERVER_VERSION=22.0.3
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/Worker-JobCorrection.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1ob4kc6" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.30.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.6.0">
<bpmn:process id="job-correction" name="Job Correction Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Job Correction Test">
<bpmn:outgoing>Flow_107kyon</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_107kyon" sourceRef="StartEvent_1" targetRef="Activity_01g5b4t" />
<bpmn:serviceTask id="Activity_01g5b4t" name="Job Correction Job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="job-correction" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_107kyon</bpmn:incoming>
<bpmn:outgoing>Flow_0y0j0w9</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:endEvent id="Event_1hmqn2y" name="Job Correction Test Completed Successfully">
<bpmn:incoming>Flow_0y0j0w9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0y0j0w9" sourceRef="Activity_01g5b4t" targetRef="Event_1hmqn2y" />
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="job-correction">
<bpmndi:BPMNShape id="StartEvent_1_di" bpmnElement="StartEvent_1">
<dc:Bounds x="182" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="163" y="145" width="74" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0s25xnu_di" bpmnElement="Activity_01g5b4t">
<dc:Bounds x="270" y="80" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hmqn2y_di" bpmnElement="Event_1hmqn2y">
<dc:Bounds x="422" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="402" y="145" width="77" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_107kyon_di" bpmnElement="Flow_107kyon">
<di:waypoint x="218" y="120" />
<di:waypoint x="270" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0y0j0w9_di" bpmnElement="Flow_0y0j0w9">
<di:waypoint x="370" y="120" />
<di:waypoint x="422" y="120" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
58 changes: 58 additions & 0 deletions src/__tests__/zeebe/integration/Worker-JobCorrection.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'
import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0'

jest.setTimeout(30000)
suppressZeebeLogging()

const zbc = new ZeebeGrpcClient()
let wf: CreateProcessInstanceResponse | undefined
let processDefinitionKey1: string
let bpmnProcessId1: string

beforeAll(async () => {
const res1 = await zbc.deployResource({
processFilename: './src/__tests__/testdata/Worker-JobCorrection.bpmn',
})
;({
processDefinitionKey: processDefinitionKey1,
bpmnProcessId: bpmnProcessId1,
} = res1.deployments[0].process)
await cancelProcesses(processDefinitionKey1)
})

afterEach(async () => {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
})

afterAll(async () => {
await zbc.close()
await cancelProcesses(processDefinitionKey1)
restoreZeebeLogging()
})

test('Can complete a task with job corrections', (done) => {
zbc
.createProcessInstance({
bpmnProcessId: bpmnProcessId1,
variables: {},
})
.then((res) => {
wf = res
zbc.createWorker({
taskType: 'job-correction',
taskHandler: async (job) => {
expect(job.processInstanceKey).toBe(wf?.processInstanceKey)
const res1 = await job.complete({
// @TODO: correction interface
})
done(null)
return res1
},
loglevel: 'NONE',
})
})
})
42 changes: 41 additions & 1 deletion src/proto/zeebe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ option java_package = "io.camunda.zeebe.gateway.protocol";
option go_package = "./;pb";

// For a more complete documentation, refer to Zeebe documentation at:
// https://docs.camunda.io/docs/reference/grpc
// https://docs.camunda.io/docs/apis-tools/zeebe-api/gateway-service/

message StreamActivatedJobsRequest {
// the job type, as defined in the BPMN process (e.g. <zeebe:taskDefinition
Expand Down Expand Up @@ -101,6 +101,46 @@ message CompleteJobRequest {
int64 jobKey = 1;
// a JSON document representing the variables in the current task scope
string variables = 2;
// The result of the completed job as determined by the worker.
optional JobResult result = 3;
}

message JobResult{
// Indicates whether the worker denies the work, i.e. explicitly doesn't approve it.
// For example, a Task Listener can deny the completion of a task by setting this flag to true.
// In this example, the completion of a task is represented by a job that the worker can complete as denied.
// As a result, the completion request is rejected and the task remains active.
// Defaults to false.
optional bool denied = 1;
// Attributes that were corrected by the worker.
// The following attributes can be corrected, additional attributes will be ignored:
// * `assignee` - reset by providing an empty String
// * `dueDate` - reset by providing an empty String
// * `followUpDate` - reset by providing an empty String
// * `candidateGroups` - reset by providing an empty list
// * `candidateUsers` - reset by providing an empty list
// * `priority` - minimum 0, maximum 100, default 50
// Omitting any of the attributes will preserve the persisted attribute's value.
optional JobResultCorrections corrections = 2;
}

message JobResultCorrections {
// The assignee of the task.
optional string assignee = 1;
// The due date of the task.
optional string dueDate = 2;
// The follow-up date of the task.
optional string followUpDate = 3;
// The list of candidate users of the task.
optional StringList candidateUsers = 4;
// The list of candidate groups of the task.
optional StringList candidateGroups = 5;
// The priority of the task.
optional int32 priority = 6;
}

message StringList {
repeated string values = 1;
}

message CompleteJobResponse {
Expand Down
39 changes: 39 additions & 0 deletions src/zeebe/lib/interfaces-grpc-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,45 @@ export interface ThrowErrorRequest {
export interface CompleteJobRequest<Variables = IProcessVariables> {
readonly jobKey: string
variables: Variables
/** Since Camunda 8.7 */
result?: JobResult
}

/**
* Since Camunda 8.7
*/
export interface JobResult {
// Indicates whether the worker denies the work, i.e. explicitly doesn't approve it.
// For example, a Task Listener can deny the completion of a task by setting this flag to true.
// In this example, the completion of a task is represented by a job that the worker can complete as denied.
// As a result, the completion request is rejected and the task remains active.
// Defaults to false.
denied?: boolean
// Attributes that were corrected by the worker.
// The following attributes can be corrected, additional attributes will be ignored:
// * `assignee` - reset by providing an empty String
// * `dueDate` - reset by providing an empty String
// * `followUpDate` - reset by providing an empty String
// * `candidateGroups` - reset by providing an empty list
// * `candidateUsers` - reset by providing an empty list
// * `priority` - minimum 0, maximum 100, default 50
// Omitting any of the attributes will preserve the persisted attribute's value.
corrections?: JobResultCorrections
}

export interface JobResultCorrections {
// The assignee of the task.
assignee?: string
// The due date of the task.
dueDate?: string
// The follow-up date of the task.
followUpDate?: string
// The list of candidate users of the task.
candidateUsers?: { values: string[] }
// The list of candidate groups of the task.
candidateGroups?: { values: string[] }
// The priority of the task.
priority?: number
}

interface SetVariablesRequestBase {
Expand Down

0 comments on commit edaddd8

Please sign in to comment.