Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(zeebe): add message correlation test #199

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/__tests__/testdata/message-correlation-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_04hax45" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="message-correlation-test" name="Message Correlation Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Message Correlation Test">
<bpmn:extensionElements>
<zeebe:properties>
<zeebe:property name="camundaModeler:exampleOutputJson" value="{&#10; &#34;orderId&#34;: 1&#10;}" />
</zeebe:properties>
</bpmn:extensionElements>
<bpmn:outgoing>Flow_0jv0pvp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0jv0pvp" sourceRef="StartEvent_1" targetRef="Event_11031ys" />
<bpmn:endEvent id="Event_1dj077m" name="Message Correlation Test Complete">
<bpmn:incoming>Flow_1vv5mu1</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1vv5mu1" sourceRef="Event_11031ys" targetRef="Event_1dj077m" />
<bpmn:intermediateCatchEvent id="Event_11031ys" name="Correlate Incoming Message">
<bpmn:incoming>Flow_0jv0pvp</bpmn:incoming>
<bpmn:outgoing>Flow_1vv5mu1</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_00g064i" messageRef="Message_0vft2qv" />
</bpmn:intermediateCatchEvent>
</bpmn:process>
<bpmn:message id="Message_0vft2qv" name="MESSAGE_CORRELATION_TEST_CATCH">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=orderId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="message-correlation-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="159" y="122" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1dj077m_di" bpmnElement="Event_1dj077m">
<dc:Bounds x="412" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="392" y="122" width="77" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0fin59b_di" bpmnElement="Event_11031ys">
<dc:Bounds x="292" y="79" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="288" y="122" width="46" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0jv0pvp_di" bpmnElement="Flow_0jv0pvp">
<di:waypoint x="215" y="97" />
<di:waypoint x="292" y="97" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1vv5mu1_di" bpmnElement="Flow_1vv5mu1">
<di:waypoint x="328" y="97" />
<di:waypoint x="412" y="97" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
56 changes: 33 additions & 23 deletions src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { v4 as uuid } from 'uuid'
import { v4 } from 'uuid'

import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
Expand All @@ -14,7 +14,8 @@ beforeAll(
async () =>
({ processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn',
processFilename:
'./src/__tests__/testdata/message-correlation-test.bpmn',
})
).deployments[0].process)
)
Expand All @@ -25,29 +26,38 @@ afterAll(async () => {
await cancelProcesses(processDefinitionKey)
})

test('Can publish a message', () =>
new Promise((done) => {
const randomId = uuid()

// Wait 1 second to make sure the deployment is complete
new Promise((res) => setTimeout(() => res(null), 1000)).then(() => {
zbc.publishMessage({
name: 'MSG-START_JOB',
test('Can correlate a message with a running process instance', async () => {
// Wait 1 second to make sure the deployment is complete, and distribute to all brokers
await new Promise((res) => setTimeout(() => res(null), 1000))

// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
// Generate a random uuid for the process "orderId" variable
const thisOrderIdValue = v4()
// Start a new process instance, and wait for the result - but asynchronously
zbc
.createProcessInstanceWithResult({
bpmnProcessId: 'message-correlation-test',
variables: {
testKey: randomId,
orderId: thisOrderIdValue,
},
correlationKey: 'something',
})

zbc.createWorker({
taskType: 'console-log-msg-start',
taskHandler: async (job) => {
const res = await job.complete()
expect(job.variables.testKey).toBe(randomId) // Makes sure the worker isn't responding to another message
done(null)
return res
},
loglevel: 'NONE',
.then((res) => {
// This code will run after the process instance has completed
expect(res.variables.orderId).toBe(thisOrderIdValue)
resolve(null)
})
// Execution continues WITHOUT waiting for the process instance to complete
// Publish the message to the process instance. Set the TTL to 5 seconds, because this will execute
// milliseconds after calling createPostInstanceWithResult, and the process will probably not have
// started yet.
const messageResponse = await zbc.publishMessage({
// Although this field is called 'correlationKey', it is actually the *value* of the variable
// specified in the process model. The correlationKey in the BPMN message definition is the *name* of the variable.
correlationKey: thisOrderIdValue,
name: 'MESSAGE_CORRELATION_TEST_CATCH',
timeToLive: 5000,
})
}))
expect(messageResponse.key).toBeDefined()
})
})
Loading