Skip to content

Commit

Permalink
Merge pull request camunda#199 from camunda/test-msg-correlation
Browse files Browse the repository at this point in the history
test(zeebe): add message correlation test
  • Loading branch information
jwulf authored Jun 25, 2024
2 parents 50c5954 + 88ca057 commit 5c72d0d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 23 deletions.
58 changes: 58 additions & 0 deletions src/__tests__/testdata/message-correlation-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_04hax45" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="message-correlation-test" name="Message Correlation Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Message Correlation Test">
<bpmn:extensionElements>
<zeebe:properties>
<zeebe:property name="camundaModeler:exampleOutputJson" value="{&#10; &#34;orderId&#34;: 1&#10;}" />
</zeebe:properties>
</bpmn:extensionElements>
<bpmn:outgoing>Flow_0jv0pvp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0jv0pvp" sourceRef="StartEvent_1" targetRef="Event_11031ys" />
<bpmn:endEvent id="Event_1dj077m" name="Message Correlation Test Complete">
<bpmn:incoming>Flow_1vv5mu1</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1vv5mu1" sourceRef="Event_11031ys" targetRef="Event_1dj077m" />
<bpmn:intermediateCatchEvent id="Event_11031ys" name="Correlate Incoming Message">
<bpmn:incoming>Flow_0jv0pvp</bpmn:incoming>
<bpmn:outgoing>Flow_1vv5mu1</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_00g064i" messageRef="Message_0vft2qv" />
</bpmn:intermediateCatchEvent>
</bpmn:process>
<bpmn:message id="Message_0vft2qv" name="MESSAGE_CORRELATION_TEST_CATCH">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=orderId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="message-correlation-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="159" y="122" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1dj077m_di" bpmnElement="Event_1dj077m">
<dc:Bounds x="412" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="392" y="122" width="77" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0fin59b_di" bpmnElement="Event_11031ys">
<dc:Bounds x="292" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="288" y="122" width="46" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0jv0pvp_di" bpmnElement="Flow_0jv0pvp">
<di:waypoint x="215" y="97" />
<di:waypoint x="292" y="97" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1vv5mu1_di" bpmnElement="Flow_1vv5mu1">
<di:waypoint x="328" y="97" />
<di:waypoint x="412" y="97" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
56 changes: 33 additions & 23 deletions src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { v4 as uuid } from 'uuid'
import { v4 } from 'uuid'

import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
Expand All @@ -14,7 +14,8 @@ beforeAll(
async () =>
({ processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn',
processFilename:
'./src/__tests__/testdata/message-correlation-test.bpmn',
})
).deployments[0].process)
)
Expand All @@ -25,29 +26,38 @@ afterAll(async () => {
await cancelProcesses(processDefinitionKey)
})

test('Can publish a message', () =>
new Promise((done) => {
const randomId = uuid()

// Wait 1 second to make sure the deployment is complete
new Promise((res) => setTimeout(() => res(null), 1000)).then(() => {
zbc.publishMessage({
name: 'MSG-START_JOB',
test('Can correlate a message with a running process instance', async () => {
// Wait 1 second to make sure the deployment is complete, and distribute to all brokers
await new Promise((res) => setTimeout(() => res(null), 1000))

// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
// Generate a random uuid for the process "orderId" variable
const thisOrderIdValue = v4()
// Start a new process instance, and wait for the result - but asynchronously
zbc
.createProcessInstanceWithResult({
bpmnProcessId: 'message-correlation-test',
variables: {
testKey: randomId,
orderId: thisOrderIdValue,
},
correlationKey: 'something',
})

zbc.createWorker({
taskType: 'console-log-msg-start',
taskHandler: async (job) => {
const res = await job.complete()
expect(job.variables.testKey).toBe(randomId) // Makes sure the worker isn't responding to another message
done(null)
return res
},
loglevel: 'NONE',
.then((res) => {
// This code will run after the process instance has completed
expect(res.variables.orderId).toBe(thisOrderIdValue)
resolve(null)
})
// Execution continues WITHOUT waiting for the process instance to complete
// Publish the message to the process instance. Set the TTL to 5 seconds, because this will execute
// milliseconds after calling createPostInstanceWithResult, and the process will probably not have
// started yet.
const messageResponse = await zbc.publishMessage({
// Although this field is called 'correlationKey', it is actually the *value* of the variable
// specified in the process model. The correlationKey in the BPMN message definition is the *name* of the variable.
correlationKey: thisOrderIdValue,
name: 'MESSAGE_CORRELATION_TEST_CATCH',
timeToLive: 5000,
})
}))
expect(messageResponse.key).toBeDefined()
})
})

0 comments on commit 5c72d0d

Please sign in to comment.