Skip to content

Commit

Permalink
feat: improved listener
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Mar 26, 2024
1 parent 6c36142 commit 5893104
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 19 deletions.
15 changes: 7 additions & 8 deletions examples/fanout-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ const parentWorkflow: Workflow = {
steps: [
{
name: 'parent-spawn',
timeout:'10s',
run: async (ctx) => {
const res = await ctx.spawnWorfklow(
const res = await ctx.spawnWorfklow<string>(
'child-workflow',
{ input: 'child-input' });
{ input: 'child-input' }).result();

const res2 = await ctx.spawnWorfklow(
'child-workflow',
{ input: 'child-input' });
return { spawned: [res, res2] }
return { spawned: [res] }
},
}
],
Expand All @@ -39,11 +37,12 @@ const childWorkflow: Workflow = {
},
steps: [
{
name: 'parent-spawn',
name: 'child-work',
run: async (ctx) => {
const { input } = ctx.workflowInput();
await sleep(3000);
console.log('child workflow input:', input);
return {}
return { "child-output": "input"}
},
}
],
Expand Down
2 changes: 1 addition & 1 deletion src/clients/hatchet-client/hatchet-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class HatchetClient {
this.event = new EventClient(this.config, this.channel, clientFactory);
this.dispatcher = new DispatcherClient(this.config, this.channel, clientFactory);
this.admin = new AdminClient(this.config, this.channel, clientFactory, this.api, this.tenantId);
this.listener = new ListenerClient(this.config, this.channel, clientFactory);
this.listener = new ListenerClient(this.config, this.channel, clientFactory, this.api);

this.logger = new Logger('HatchetClient', this.config.log_level);

Expand Down
38 changes: 37 additions & 1 deletion src/clients/listener/listener-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { ClientConfig } from '@clients/hatchet-client/client-config';
import HatchetError from '@util/errors/hatchet-error';
import { Logger } from '@hatchet/util/logger';
import sleep from '@hatchet/util/sleep';
import { Api } from '../rest';
import { WorkflowRunStatus } from '../rest/generated/data-contracts';

const DEFAULT_ACTION_LISTENER_RETRY_INTERVAL = 5; // seconds
const DEFAULT_ACTION_LISTENER_RETRY_COUNT = 5;
Expand All @@ -30,18 +32,52 @@ export class ListenerClient {
config: ClientConfig;
client: PbDispatcherClient;
logger: Logger;
api: Api;

constructor(config: ClientConfig, channel: Channel, factory: ClientFactory) {
constructor(config: ClientConfig, channel: Channel, factory: ClientFactory, api: Api) {
this.config = config;
this.client = factory.create(DispatcherDefinition, channel);
this.logger = new Logger(`Listener`, config.log_level);
this.api = api;
}

async getWorkflowRun(workflowRunId: string) {
try {
const res = await this.api.workflowRunGet(this.config.tenant_id, workflowRunId);

const stepRuns = res.data.jobRuns?.[0]?.stepRuns ?? [];

if(res.data.status === WorkflowRunStatus.SUCCEEDED){
const stepRunOutput = stepRuns.reduce((acc: Record<string, any>, stepRun) => {
console.log(stepRun.output)
acc[stepRun.step?.readableId || ''] = JSON.parse(stepRun.output || "{}");
return acc;
}, {});

return {
type: StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED,
payload: JSON.stringify(stepRunOutput),
};

}
return undefined;
} catch (e: any) {
throw new HatchetError(e.message);
}
}


async *stream(workflowRunId: string) {
let listener = this.client.subscribeToWorkflowEvents({
workflowRunId,
});

const res = await this.getWorkflowRun(workflowRunId)
if(res){
yield res
}


try {
for await (const workflowEvent of listener) {
let eventType: StepRunEventType | undefined;
Expand Down
57 changes: 48 additions & 9 deletions src/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { Logger } from './util/logger';
import { parseJSON } from './util/parse';
import { AdminClient } from './clients/admin';
import { HatchetClient } from './clients/hatchet-client';
import { StepRunEventType } from './clients/listener/listener-client';
import { WorkflowRunStatus } from './clients/rest/generated/data-contracts';

export const CreateStepSchema = z.object({
name: z.string(),
Expand All @@ -26,12 +28,48 @@ interface ContextData<T, K> {
user_data: K;
}

interface ChildWorkflowRef<T> {
workflowRunId: string;
toPromise(): Promise<T>
toJSON(): string;
class ChildWorkflowRef<T> {
workflowRunId: Promise<string>;
client: HatchetClient;

constructor(workflowRunId: Promise<string>, client: HatchetClient) {
this.workflowRunId = workflowRunId;
this.client = client;
}

async stream(): Promise<AsyncGenerator<{ type: StepRunEventType; payload: string; }, void, unknown>>{
const workflowRunId = await this.workflowRunId;



return this.client.listener.stream(await this.workflowRunId);
}

async result(): Promise<T> {
return new Promise(async (resolve, reject) => {
console.log('waiting for workflow to complete');


for await (const event of await this.stream()) {
console.log('event received', event);

if(event.type === StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED){
return resolve(event.payload as T);
}
}

});
}

async toJSON(): Promise<string> {
return JSON.stringify({
workflowRunId: await this.workflowRunId,
});
}

}


export class Context<T, K> {
data: ContextData<T, K>;
input: T;
Expand Down Expand Up @@ -122,7 +160,7 @@ export class Context<T, K> {
this.client.event.putLog(stepRunId, message, level);
}

async spawnWorfklow<K=any>(workflowName: string, input: T, key?: string): Promise<string> {
spawnWorfklow<K=unknown>(workflowName: string, input: T, key?: string): ChildWorkflowRef<K> {
const { workflowRunId, stepRunId } = this.action;

const childWorkflowRunIdPromise = this.client.admin.run_workflow(workflowName, input, {
Expand All @@ -134,16 +172,17 @@ export class Context<T, K> {

this.spawnIndex++;

const childWorkflowRunId = await childWorkflowRunIdPromise;

return childWorkflowRunId;
return new ChildWorkflowRef(childWorkflowRunIdPromise, this.client);
}

// spawnScheduledWorfklow(workflowName: string, input: T, key?: string): void {
// TODO spawnScheduledWorfklow(workflowName: string, input: T, key?: string): void {
// this.client.admin.schedule_workflow(workflowName, input, user_data);
// }

// async join(refs: ChildWorkflowRef<K>[]): Promise<Array<K>> {

// this.

// this.client.admin.join(refs);
// }

Expand Down

0 comments on commit 5893104

Please sign in to comment.