diff --git a/src/zeebe/lib/ZBStreamWorker.ts b/src/zeebe/lib/ZBStreamWorker.ts index 8c1a8fc4..269d8625 100644 --- a/src/zeebe/lib/ZBStreamWorker.ts +++ b/src/zeebe/lib/ZBStreamWorker.ts @@ -66,20 +66,28 @@ export class ZBStreamWorker implements IZBJobWorker { console.error(e) }) stream.on('data', (res: ActivatedJob) => { - // Make handlers - const job: Job = - parseVariablesAndCustomHeadersToJSON< - WorkerInputVariables, - CustomHeaderShape - >(res, inputVariableDto, customHeadersDto) - taskHandler( - { - ...job, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ...this.makeCompleteHandlers(job as any, req.type), - }, - this - ) + parseVariablesAndCustomHeadersToJSON< + WorkerInputVariables, + CustomHeaderShape + >(res, inputVariableDto, customHeadersDto) + .then((job: Job) => { + taskHandler( + { + ...job, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ...this.makeCompleteHandlers(job as any, req.type), + }, + this + ) + }) + .catch((e) => + this.zbClient.failJob({ + jobKey: res.key, + errorMessage: `Error parsing variable payload ${e}`, + retries: res.retries - 1, + retryBackOff: 0, + }) + ) }) this.streams.push(stream) return { diff --git a/src/zeebe/lib/ZBWorkerBase.ts b/src/zeebe/lib/ZBWorkerBase.ts index 57b6300d..730c1f65 100644 --- a/src/zeebe/lib/ZBWorkerBase.ts +++ b/src/zeebe/lib/ZBWorkerBase.ts @@ -603,12 +603,28 @@ You should call only one job action method in the worker handler. This is a bug } this.activeJobs += res.jobs.length - const jobs = res.jobs.map((job) => - parseVariablesAndCustomHeadersToJSON< - WorkerInputVariables, - CustomHeaderShape - >(job, this.inputVariableDto, this.customHeadersDto) - ) - this.handleJobs(jobs) + Promise.all( + res.jobs + .map((job) => + parseVariablesAndCustomHeadersToJSON< + WorkerInputVariables, + CustomHeaderShape + >(job, this.inputVariableDto, this.customHeadersDto) + .then((job) => job) + .catch((e) => { + this.zbClient.failJob({ + jobKey: job.key, + errorMessage: `Error parsing variable payload: ${e}`, + retries: job.retries - 1, + retryBackOff: 0, + }) + return null + }) + ) + .filter( + (f): f is Promise> => + !!f + ) + ).then((jobs) => this.handleJobs(jobs)) } } diff --git a/src/zeebe/lib/stringifyVariables.ts b/src/zeebe/lib/stringifyVariables.ts index 967dd0b2..57770490 100644 --- a/src/zeebe/lib/stringifyVariables.ts +++ b/src/zeebe/lib/stringifyVariables.ts @@ -17,17 +17,27 @@ export function parseVariablesAndCustomHeadersToJSON( inputVariableDto: new (...args: any[]) => Readonly, // eslint-disable-next-line @typescript-eslint/no-explicit-any customHeadersDto: new (...args: any[]) => Readonly -): Job { - return Object.assign({}, response, { - customHeaders: losslessParse( - response.customHeaders, - customHeadersDto - ) as CustomHeaders, - variables: losslessParse( - response.variables, - inputVariableDto - ) as Readonly, - }) as Job +): Promise> { + return new Promise((resolve, reject) => { + try { + resolve( + Object.assign({}, response, { + customHeaders: losslessParse( + response.customHeaders, + customHeadersDto + ) as CustomHeaders, + variables: losslessParse( + response.variables, + inputVariableDto + ) as Readonly, + }) as Job + ) + } catch (e) { + console.error(`Error parsing variables ${e}`) + console.error('Job', response) + reject(e) + } + }) } export function stringifyVariables< diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 948fb2b4..0e28649a 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -284,14 +284,22 @@ export class ZeebeGrpcClient extends TypedEmitter< const stream = await (await this.grpc).activateJobsStream(req) stream.on('data', (res: Grpc.ActivateJobsResponse) => { - const parsedJobs = res.jobs.map((job) => + res.jobs.forEach((job) => parseVariablesAndCustomHeadersToJSON( job, inputVariableDtoToUse, customHeadersDtoToUse ) + .then((parsedJob) => jobs.push(parsedJob)) + .catch((e) => + this.failJob({ + jobKey: job.key, + errorMessage: `Error parsing job variables ${e}`, + retries: job.retries - 1, + retryBackOff: 0, + }) + ) ) - jobs.push(...parsedJobs) }) stream.on('end', () => {