Skip to content

Commit

Permalink
use recommended secondsUntilNextWorkflow to schedule orchestator work…
Browse files Browse the repository at this point in the history
…flow dynamically
  • Loading branch information
jfrank-summit committed Feb 6, 2025
1 parent 716e39d commit 12b5763
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 47 deletions.
9 changes: 4 additions & 5 deletions src/agents/workflows/orchestrator/nodes/finishWorkflowNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AIMessage } from '@langchain/core/messages';
import { VectorDB } from '../../../../services/vectorDb/VectorDB.js';
const logger = createLogger('finish-workflow-node');

const parseFinishWorkflow = async (content: unknown) => {
export const parseFinishedWorkflow = async (content: unknown) => {
const contentString = typeof content === 'string' ? content : JSON.stringify(content, null, 2);
if (typeof contentString === 'string') {
try {
Expand Down Expand Up @@ -40,18 +40,17 @@ export const createFinishWorkflowNode = (
messages,
currentTime: new Date().toISOString(),
});
logger.info('Summarizing messages:', { messages });

const result = await orchestratorModel.invoke(formattedPrompt);
const finishedWorkflow = await parseFinishWorkflow(result.content);
const result = await orchestratorModel.bind({ tools: [] }).invoke(formattedPrompt);
const finishedWorkflow = await parseFinishedWorkflow(result.content);

logger.info('Finished Workflow:', { finishedWorkflow });

await vectorStore.insert(JSON.stringify(finishedWorkflow));

return {
messages: [new AIMessage({ content: result.content })],
...finishedWorkflow,
finishedWorkflow,
};
};
return runNode;
Expand Down
8 changes: 3 additions & 5 deletions src/agents/workflows/orchestrator/nodes/inputPrompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ export const createInputPrompt = async (customInstructions?: string) => {
**Permanent Storage (Autonomy Network's DSN)**:
- Use this for **immutable, permanent** experiences that you would like to survive forever (e.g., fine-tuning/RAG workflows).
- **SAVE TO PERMANENT STORAGE WHEN**:
- After you complete a significant action (e.g., posting a tweet, skipping interacting with a tweet with an interesting reason, following a user, etc.).
- Save detailed information about the action (e.g., tweet text, tweet id, user details).
- After you complete a significant action.
- Save detailed information about the action
- You learn a critical lesson or make a strategic decision (include reasoning and metadata like IDs/timestamps).
- **FORMAT**:
- Include timestamps, IDs, reasoning, and full context (e.g., tweet text, decision logic).
GOOD: Saving your experiences to permanent storage with significant detail that will help recreate your experiences in the future!
BAD: Not saving your experiences to permanent storage. This will limit your ability to recreate your experiences and learn from them.
- Include timestamps, IDs, reasoning, and full context
Custom Instructions:
{customInstructions}
Expand Down
32 changes: 26 additions & 6 deletions src/agents/workflows/orchestrator/orchestratorWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { StructuredToolInterface } from '@langchain/core/tools';
import { RunnableToolLike } from '@langchain/core/runnables';
import { VectorDB } from '../../../services/vectorDb/VectorDB.js';
import { join } from 'path';
import { FinishedWorkflow } from './nodes/finishWorkflowPrompt.js';
import { parseFinishedWorkflow } from './nodes/finishWorkflowNode.js';
const logger = createLogger('orchestrator-workflow');

const createWorkflowConfig = async (
Expand Down Expand Up @@ -52,7 +54,10 @@ const createOrchestratorWorkflow = async (nodes: Awaited<ReturnType<typeof creat
};

export type OrchestratorRunner = Readonly<{
runWorkflow: (input?: OrchestratorInput, options?: { threadId?: string }) => Promise<unknown>;
runWorkflow: (
input?: OrchestratorInput,
options?: { threadId?: string },
) => Promise<FinishedWorkflow>;
}>;

export const createOrchestratorRunner = async (
Expand All @@ -75,8 +80,11 @@ export const createOrchestratorRunner = async (
const app = workflow.compile({ checkpointer: memoryStore });

return {
runWorkflow: async (input?: OrchestratorInput, options?: { threadId?: string }) => {
const threadId = options?.threadId || 'orchestrator_workflow_state';
runWorkflow: async (
input?: OrchestratorInput,
options?: { threadId?: string },
): Promise<FinishedWorkflow> => {
const threadId = `${options?.threadId || 'orchestrator'}-${Date.now()}`;
logger.info('Starting orchestrator workflow', { threadId });

if (!vectorStore.isOpen()) {
Expand All @@ -93,15 +101,27 @@ export const createOrchestratorRunner = async (

const initialState = input || { messages: [] };
const stream = await app.stream(initialState, config);
let finalState = {};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let finalState = {} as any;

for await (const state of stream) {
finalState = state;
}

logger.info('Workflow completed', { threadId });
logger.info('Workflow completed', {
threadId,
});
vectorStore.close();
return finalState;

if (finalState?.finishWorkflow?.messages?.[0]?.content) {
return await parseFinishedWorkflow(finalState.finishWorkflow.messages[0].content);
} else {
logger.error('Workflow completed but no finished workflow data found', {
finalState,
content: finalState?.finishWorkflow?.content,
});
return { workflowSummary: 'Extracting workflow data failed' };
}
},
};
};
Expand Down
3 changes: 2 additions & 1 deletion src/agents/workflows/twitter/prompts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ export const createTwitterPrompts = async () => {
- In order to gain context you should check tweets and replies that you have recently posted before posting or replying to tweets in orderto avoid being repetitive. Once this has been done, you can then post or reply to a tweet.
- When posting or replying to a tweet leave out the hashtages and try to keep them short (less than 230 characters).
- Do not be repetitive, use different phrases and words with each post.
- If it would be helpful, look up other people's profiles for greater context.`;
- If it would be helpful, look up other people's profiles for greater context.
- If you find a user that you think is interesting, follow them.`;

const customMessageSummaryInstructions = `
- Summarize the actions taken in detail. Include reasoning and metadata like tweet IDs/timestamps.
Expand Down
56 changes: 26 additions & 30 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,37 @@ process.on('SIGTERM', () => {
});

const runner = await orchestratorRunner();
const startWorkflowPolling = async () => {
try {
const initalMessage = new HumanMessage(`
As a social media manager, you are expected to interact with twitter periodically in order to maintain social engagement. Use your judgement how frequently you should run these interactions and what you should do. For efficiency, just have one or two tasks per request. You don't need do the same things every time. Save any interesting experiences from your interactions your permanent storage.
EXAMPLES:
- Check your timiline for interesting conversations and join the conversation.
- Like interesting tweets.
- Follow interesting users.
- Check your mentions and reply to useful conversations that you haven't replied to yet.
- Post a new tweet.
`);
const initalMessage = `As a social media manager, you are expected to interact with twitter periodically in order to maintain social engagement. Use your judgement how frequently you should run these interactions and what you should do. You don't need do the same things every time. Save any interesting experiences from your interactions your permanent storage.
const _result = await runner.runWorkflow({ messages: [initalMessage] });

logger.info('Workflow execution completed successfully for character:', {
charcterName: config.characterConfig.name,
runFinished: new Date().toISOString(),
nextRun: `${config.twitterConfig.RESPONSE_INTERVAL_MS / 1000 / 60} minutes`,
});
} catch (error) {
if (error && typeof error === 'object' && 'name' in error && error.name === 'ExitPromptError') {
logger.info('Process terminated by user');
process.exit(0);
}
logger.error('Error running workflow:', error);
process.exit(1);
}
};
EXAMPLES:
- Check your timiline for interesting conversations and join the conversation.
- Like interesting tweets.
- Follow interesting users.
- Check your mentions and reply to useful conversations that you haven't replied to yet.
- Post a new tweet.
`;

const main = async () => {
try {
await validateLocalHash();
await startWorkflowPolling();
setInterval(startWorkflowPolling, config.twitterConfig.RESPONSE_INTERVAL_MS);

let message = initalMessage;
while (true) {
const result = await runner.runWorkflow({ messages: [new HumanMessage(message)] });
message = `Previous workflow summary ran at ${new Date().toISOString()}: ${result.workflowSummary}
Instructions: ${result.nextRecommendedAction}`;

logger.info('Workflow execution result:', { result });

const nextDelaySeconds =
result.secondsUntilNextWorkflow ?? config.twitterConfig.RESPONSE_INTERVAL_MS / 1000;
logger.info('Workflow execution completed successfully for character:', {
charcterName: config.characterConfig.name,
runFinished: new Date().toISOString(),
nextRun: `${nextDelaySeconds / 60} minutes`,
});
await new Promise(resolve => setTimeout(resolve, nextDelaySeconds * 1000));
}
} catch (error) {
if (error && typeof error === 'object' && 'name' in error && error.name === 'ExitPromptError') {
logger.info('Process terminated by user');
Expand Down

0 comments on commit 12b5763

Please sign in to comment.