diff --git a/src/__tests__/testdata/message-correlation-test.bpmn b/src/__tests__/testdata/message-correlation-test.bpmn new file mode 100644 index 00000000..85ea2421 --- /dev/null +++ b/src/__tests__/testdata/message-correlation-test.bpmn @@ -0,0 +1,58 @@ + + + + + + + + + + Flow_0jv0pvp + + + + Flow_1vv5mu1 + + + + Flow_0jv0pvp + Flow_1vv5mu1 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts b/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts index a764a1b4..80cd6f7a 100644 --- a/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts +++ b/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts @@ -1,4 +1,4 @@ -import { v4 as uuid } from 'uuid' +import { v4 } from 'uuid' import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' import { ZeebeGrpcClient } from '../../../zeebe' @@ -14,7 +14,8 @@ beforeAll( async () => ({ processDefinitionKey } = ( await zbc.deployResource({ - processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn', + processFilename: + './src/__tests__/testdata/message-correlation-test.bpmn', }) ).deployments[0].process) ) @@ -25,29 +26,38 @@ afterAll(async () => { await cancelProcesses(processDefinitionKey) }) -test('Can publish a message', () => - new Promise((done) => { - const randomId = uuid() - - // Wait 1 second to make sure the deployment is complete - new Promise((res) => setTimeout(() => res(null), 1000)).then(() => { - zbc.publishMessage({ - name: 'MSG-START_JOB', +test('Can correlate a message with a running process instance', async () => { + // Wait 1 second to make sure the deployment is complete, and distribute to all brokers + await new Promise((res) => setTimeout(() => res(null), 1000)) + + // eslint-disable-next-line no-async-promise-executor + await new Promise(async (resolve) => { + // Generate a random uuid for the process "orderId" variable + const thisOrderIdValue = v4() + // Start a new process instance, and wait for the result - but asynchronously + zbc + .createProcessInstanceWithResult({ + bpmnProcessId: 'message-correlation-test', variables: { - testKey: randomId, + orderId: thisOrderIdValue, }, - correlationKey: 'something', }) - - zbc.createWorker({ - taskType: 'console-log-msg-start', - taskHandler: async (job) => { - const res = await job.complete() - expect(job.variables.testKey).toBe(randomId) // Makes sure the worker isn't responding to another message - done(null) - return res - }, - loglevel: 'NONE', + .then((res) => { + // This code will run after the process instance has completed + expect(res.variables.orderId).toBe(thisOrderIdValue) + resolve(null) }) + // Execution continues WITHOUT waiting for the process instance to complete + // Publish the message to the process instance. Set the TTL to 5 seconds, because this will execute + // milliseconds after calling createPostInstanceWithResult, and the process will probably not have + // started yet. + const messageResponse = await zbc.publishMessage({ + // Although this field is called 'correlationKey', it is actually the *value* of the variable + // specified in the process model. The correlationKey in the BPMN message definition is the *name* of the variable. + correlationKey: thisOrderIdValue, + name: 'MESSAGE_CORRELATION_TEST_CATCH', + timeToLive: 5000, }) - })) + expect(messageResponse.key).toBeDefined() + }) +})