Skip to content

Commit

Permalink
fix(zeebe): fail job if variables cannot be parsed
Browse files Browse the repository at this point in the history
if the variable payload cannot be parsed, fail the job rather than throwing an error

fixes #236
  • Loading branch information
jwulf committed Aug 30, 2024
1 parent 72b42fa commit 495c05e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 34 deletions.
36 changes: 22 additions & 14 deletions src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,28 @@ export class ZBStreamWorker implements IZBJobWorker {
console.error(e)
})
stream.on('data', (res: ActivatedJob) => {
// Make handlers
const job: Job<WorkerInputVariables, CustomHeaderShape> =
parseVariablesAndCustomHeadersToJSON<
WorkerInputVariables,
CustomHeaderShape
>(res, inputVariableDto, customHeadersDto)
taskHandler(
{
...job,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
...this.makeCompleteHandlers(job as any, req.type),
},
this
)
parseVariablesAndCustomHeadersToJSON<
WorkerInputVariables,
CustomHeaderShape
>(res, inputVariableDto, customHeadersDto)
.then((job: Job<WorkerInputVariables, CustomHeaderShape>) => {
taskHandler(
{
...job,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
...this.makeCompleteHandlers(job as any, req.type),
},
this
)
})
.catch((e) =>
this.zbClient.failJob({
jobKey: res.key,
errorMessage: `Error parsing variable payload ${e}`,
retries: res.retries - 1,
retryBackOff: 0,
})
)
})
this.streams.push(stream)
return {
Expand Down
30 changes: 23 additions & 7 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,12 +603,28 @@ You should call only one job action method in the worker handler. This is a bug
}
this.activeJobs += res.jobs.length

const jobs = res.jobs.map((job) =>
parseVariablesAndCustomHeadersToJSON<
WorkerInputVariables,
CustomHeaderShape
>(job, this.inputVariableDto, this.customHeadersDto)
)
this.handleJobs(jobs)
Promise.all(
res.jobs
.map((job) =>
parseVariablesAndCustomHeadersToJSON<
WorkerInputVariables,
CustomHeaderShape
>(job, this.inputVariableDto, this.customHeadersDto)
.then((job) => job)
.catch((e) => {
this.zbClient.failJob({
jobKey: job.key,
errorMessage: `Error parsing variable payload: ${e}`,
retries: job.retries - 1,
retryBackOff: 0,
})
return null
})
)
.filter(
(f): f is Promise<ZB.Job<WorkerInputVariables, CustomHeaderShape>> =>
!!f
)
).then((jobs) => this.handleJobs(jobs))
}
}
32 changes: 21 additions & 11 deletions src/zeebe/lib/stringifyVariables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,27 @@ export function parseVariablesAndCustomHeadersToJSON<Variables, CustomHeaders>(
inputVariableDto: new (...args: any[]) => Readonly<Variables>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto: new (...args: any[]) => Readonly<CustomHeaders>
): Job<Variables, CustomHeaders> {
return Object.assign({}, response, {
customHeaders: losslessParse(
response.customHeaders,
customHeadersDto
) as CustomHeaders,
variables: losslessParse(
response.variables,
inputVariableDto
) as Readonly<Variables>,
}) as Job<Variables, CustomHeaders>
): Promise<Job<Variables, CustomHeaders>> {
return new Promise((resolve, reject) => {
try {
resolve(
Object.assign({}, response, {
customHeaders: losslessParse(
response.customHeaders,
customHeadersDto
) as CustomHeaders,
variables: losslessParse(
response.variables,
inputVariableDto
) as Readonly<Variables>,
}) as Job<Variables, CustomHeaders>
)
} catch (e) {
console.error(`Error parsing variables ${e}`)
console.error('Job', response)
reject(e)
}
})
}

export function stringifyVariables<
Expand Down
12 changes: 10 additions & 2 deletions src/zeebe/zb/ZeebeGrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,22 @@ export class ZeebeGrpcClient extends TypedEmitter<

const stream = await (await this.grpc).activateJobsStream(req)
stream.on('data', (res: Grpc.ActivateJobsResponse) => {
const parsedJobs = res.jobs.map((job) =>
res.jobs.forEach((job) =>
parseVariablesAndCustomHeadersToJSON<Variables, CustomHeaders>(
job,
inputVariableDtoToUse,
customHeadersDtoToUse
)
.then((parsedJob) => jobs.push(parsedJob))
.catch((e) =>
this.failJob({
jobKey: job.key,
errorMessage: `Error parsing job variables ${e}`,
retries: job.retries - 1,
retryBackOff: 0,
})
)
)
jobs.push(...parsedJobs)
})

stream.on('end', () => {
Expand Down

0 comments on commit 495c05e

Please sign in to comment.