Skip to content

Commit

Permalink
feat(camunda8): implement publishMessage over REST
Browse files Browse the repository at this point in the history
fixes #250
  • Loading branch information
jwulf committed Sep 18, 2024
1 parent f2d78ed commit 008c789
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
19 changes: 19 additions & 0 deletions src/c8/lib/C8Dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,22 @@ export interface UpdateElementVariableRequest {
*/
operationReference?: number
}

export class CorrelateMessageResponse extends LosslessDto {
/** the unique ID of the message that was published */
@Int64String
key!: string
/** the tenantId of the message */
tenantId!: string
/** The key of the first process instance the message correlated with */
@Int64String
processInstanceKey!: string
}

export class PublishMessageResponse extends LosslessDto {
/** the unique ID of the message that was published */
@Int64String
key!: string
/** the tenantId of the message */
tenantId!: string
}
26 changes: 23 additions & 3 deletions src/c8/lib/C8RestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import {
JobCompletionInterfaceRest,
JSONDoc,
PublishMessageRequest,
PublishMessageResponse,
TopologyResponse,
} from '../../zeebe/types'

import {
BroadcastSignalResponse,
CorrelateMessageResponse,
CreateProcessInstanceResponse,
Ctor,
DecisionDeployment,
Expand All @@ -52,6 +52,7 @@ import {
MigrationRequest,
NewUserInfo,
ProcessDeployment,
PublishMessageResponse,
TaskChangeSet,
UpdateElementVariableRequest,
} from './C8Dto'
Expand Down Expand Up @@ -256,16 +257,35 @@ export class C8RestClient {
PublishMessageRequest,
'name' | 'correlationKey' | 'variables' | 'tenantId'
>
): Promise<PublishMessageResponse & { processInstanceKey: string }> {
) {
const headers = await this.getHeaders()

return this.rest.then((rest) =>
rest
.post(`messages/correlation`, {
body: losslessStringify(message),
headers,
parseJson: (text) => losslessParse(text, CorrelateMessageResponse),
})
.json<CorrelateMessageResponse>()
)
}

/**
* Publishes a single message. Messages are published to specific partitions computed from their correlation keys.
* The endpoint does not wait for a correlation result. Use `correlateMessage` for such use cases.
*/
public async publishMessage(publishMessageRequest: PublishMessageRequest) {
const headers = await this.getHeaders()
const request = this.addDefaultTenantId(publishMessageRequest)
return this.rest.then((rest) =>
rest
.post(`messages/publication`, {
headers,
body: stringify(request),
parseJson: (text) => losslessParse(text, PublishMessageResponse),
})
.json()
.json<PublishMessageResponse>()
)
}

Expand Down

0 comments on commit 008c789

Please sign in to comment.