Skip to content

Commit

Permalink
feat(tasks): implement max iterations (#3845)
Browse files Browse the repository at this point in the history
  • Loading branch information
brunozoric authored Jan 29, 2024
1 parent c4133e9 commit abb442c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 5 deletions.
15 changes: 12 additions & 3 deletions packages/tasks/src/response/TaskResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const getWaitingTime = (options?: ITaskResponseContinueOptions): number | undefi
const now = new Date();
waitingTime = (options.date.getTime() - now.getTime()) / 1000;
}
if (!waitingTime) {
if (!waitingTime || waitingTime < 0) {
return undefined;
}
return waitingTime > MAX_WAITING_TIME ? waitingTime : MAX_WAITING_TIME;
Expand Down Expand Up @@ -72,13 +72,22 @@ export class TaskResponse implements ITaskResponse {
});
}

public error(error: IResponseError | Error): ITaskResponseErrorResult {
public error(error: IResponseError | Error | string): ITaskResponseErrorResult {
return this.response.error({
error: error instanceof Error ? getErrorProperties(error) : error
error: this.getError(error)
});
}

public aborted(): ITaskResponseAbortedResult {
return this.response.aborted();
}

private getError(error: IResponseError | Error | string): IResponseError | Error {
if (error instanceof Error) {
return getErrorProperties(error);
} else if (typeof error === "string") {
return new Error(error);
}
return error;
}
}
2 changes: 1 addition & 1 deletion packages/tasks/src/response/abstractions/TaskResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ export interface ITaskResponse<
> {
done: (message?: string, output?: O) => ITaskResponseDoneResult<O>;
continue: (data: T, options?: ITaskResponseContinueOptions) => ITaskResponseContinueResult<T>;
error: (error: IResponseError | Error) => ITaskResponseErrorResult;
error: (error: IResponseError | Error | string) => ITaskResponseErrorResult;
aborted: () => ITaskResponseAbortedResult;
}
27 changes: 27 additions & 0 deletions packages/tasks/src/runner/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,33 @@ export class TaskManager<T = ITaskDataInput> implements ITaskManager<T> {
});
}
}
/**
* We do not want to run the task indefinitely.
* If the task has reached the max iterations, we will stop it and execute the onMaxIterations handler, if any.
*/
//
else if (this.store.getTask().iterations >= definition.maxIterations) {
try {
if (definition.onMaxIterations) {
await definition.onMaxIterations({
task: this.store.getTask(),
context: this.context
});
}
return this.response.error({
error: {
message: "Task reached max iterations."
}
});
} catch (ex) {
return this.response.error({
error: {
message: "Failed to execute onMaxIterations handler.",
data: getErrorProperties(ex)
}
});
}
}
/**
* Always update the task iteration.
*/
Expand Down
19 changes: 18 additions & 1 deletion packages/tasks/src/task/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import {
ITaskResponseDoneResultOutput
} from "~/types";

/**
* By default, we will stop iterating through the task after DEFAULT_MAX_ITERATIONS.
*
* This mechanism will prevent infinite loops in case of a bug in the task code.
*/
const DEFAULT_MAX_ITERATIONS = 500;

export interface ITaskPluginSetFieldsCallback {
(fields: ITaskDefinitionField[]): ITaskDefinitionField[] | undefined;
}
Expand All @@ -16,8 +23,9 @@ export interface ITaskDefinitionParams<
C extends Context = Context,
I = any,
O extends ITaskResponseDoneResultOutput = ITaskResponseDoneResultOutput
> extends Omit<ITaskDefinition<C, I, O>, "fields"> {
> extends Omit<ITaskDefinition<C, I, O>, "fields" | "maxIterations"> {
config?: (task: Pick<TaskDefinitionPlugin<C, I, O>, "addField" | "setFields">) => void;
maxIterations?: number;
}

export class TaskDefinitionPlugin<
Expand Down Expand Up @@ -62,11 +70,20 @@ export class TaskDefinitionPlugin<
return this.task.onError;
}

public get onMaxIterations() {
return this.task.onMaxIterations;
}

public get maxIterations(): number {
return this.task.maxIterations || DEFAULT_MAX_ITERATIONS;
}

public constructor(task: ITaskDefinitionParams<C, I, O>) {
super();
this.isPrivate = task.isPrivate || false;
this.task = {
...task,
maxIterations: task.maxIterations || DEFAULT_MAX_ITERATIONS,
fields: []
};
if (typeof task.config === "function") {
Expand Down
14 changes: 14 additions & 0 deletions packages/tasks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ export interface ITaskOnAbortParams<C extends Context> {
task: ITask;
}

export interface ITaskOnMaxIterationsParams<C extends Context> {
context: C;
task: ITask;
}

export enum TaskResponseStatus {
DONE = "done",
ERROR = "error",
Expand Down Expand Up @@ -341,6 +346,10 @@ export interface ITaskDefinition<
* A description of the task, for the UI.
*/
description?: string;
/**
* Maximum number a step function can call the Lambda.
*/
maxIterations: number;
/**
* Task run method.
*/
Expand All @@ -365,6 +374,11 @@ export interface ITaskDefinition<
* This method will be called when user aborts the task.
*/
onAbort?: (params: ITaskOnAbortParams<C>) => Promise<void>;
/**
* When task hits max iterations, this method will be called.
* This will be called during the run time of the task.
*/
onMaxIterations?: (params: ITaskOnMaxIterationsParams<C>) => Promise<void>;
/**
* Custom input fields and layout for the task input.
*/
Expand Down

0 comments on commit abb442c

Please sign in to comment.