From 5d623e6a528c1004f997b418966d9c86533bbf59 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sun, 7 Apr 2024 17:19:38 -0400 Subject: [PATCH 1/2] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c7b37e08..10e13d38 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ ControlFlow is a framework for integrating AI agents into traditional workflows. It allows for agents that can be precisely controlled, observed, and debugged, while retaining the autonomy and flexibility that make LLMs so powerful. ControlFlow agents are designed to be invoked programmatically, though they are capable of interacting with humans and other agents as well. -ControlFlow is built with [Marvin](https://github.com/prefecthq/marvin) and [Prefect](https://github.com/prefecthq/prefect). +🚨 ControlFlow is built on top of bleeding-edge versions of [Marvin](https://github.com/prefecthq/marvin) and [Prefect](https://github.com/prefecthq/prefect). Caveat emptor! ## Example From bddb485fe2e592d7e719697ee2277552ca27ca49 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sun, 7 Apr 2024 21:17:42 -0400 Subject: [PATCH 2/2] Support multiple assistants for tasks --- README.md | 4 +- examples/documentation.py | 8 +- examples/readme_example.py | 4 +- src/control_flow/__init__.py | 4 +- src/control_flow/agent.py | 303 ++++++++++++++++------------- src/control_flow/delegation.py | 86 ++++++++ src/control_flow/flow.py | 39 ++-- src/control_flow/settings.py | 22 ++- tests/flows/test_sign_guestbook.py | 47 +++++ 9 files changed, 348 insertions(+), 169 deletions(-) create mode 100644 src/control_flow/delegation.py create mode 100644 tests/flows/test_sign_guestbook.py diff --git a/README.md b/README.md index 10e13d38..730efb78 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ ControlFlow is a framework for integrating AI agents into traditional workflows. ## Example ```python -from control_flow import ai_flow, ai_task, run_agent, instructions +from control_flow import ai_flow, ai_task, run_ai_task, instructions from pydantic import BaseModel @@ -40,7 +40,7 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_agent("ask user for three interests", cast=list[str], user_access=True) + interests = run_ai_task("ask user for three interests", cast=list[str], user_access=True) # set instructions for just the next task with instructions("no more than 8 lines"): diff --git a/examples/documentation.py b/examples/documentation.py index 04db8146..1a6b5bba 100644 --- a/examples/documentation.py +++ b/examples/documentation.py @@ -4,7 +4,7 @@ import control_flow from control_flow import ai_flow, ai_task from marvin.beta.assistants import Assistant, Thread -from marvin.tools.filesystem import ls, read, read_lines, write +from marvin.tools.filesystem import read, write ROOT = Path(control_flow.__file__).parents[2] @@ -23,7 +23,7 @@ def glob(pattern: str) -> list[str]: You are an expert technical writer who writes wonderful documentation for open-source tools and believes that documentation is a product unto itself. """, - tools=[read, read_lines, ls, write, glob], + tools=[read, write, glob], ) @@ -35,7 +35,7 @@ def examine_source_code(source_dir: Path, extensions: list[str]): """ -@ai_task +@ai_task(model="gpt-3.5-turbo") def read_docs(docs_dir: Path): """ Read all documentation in the docs dir and subdirectories, if any. @@ -52,7 +52,7 @@ def write_docs(docs_dir: Path, instructions: str = None): @ai_flow(assistant=assistant) def docs_flow(instructions: str): examine_source_code(ROOT / "src", extensions=[".py"]) - read_docs(ROOT / "docs") + # read_docs(ROOT / "docs") write_docs(ROOT / "docs", instructions=instructions) diff --git a/examples/readme_example.py b/examples/readme_example.py index d88de7d3..0ac0298b 100644 --- a/examples/readme_example.py +++ b/examples/readme_example.py @@ -1,4 +1,4 @@ -from control_flow import ai_flow, ai_task, instructions, run_agent +from control_flow import ai_flow, ai_task, instructions, run_ai_task from pydantic import BaseModel @@ -26,7 +26,7 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_agent( + interests = run_ai_task( "ask user for three interests", cast=list[str], user_access=True, diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 97f9c013..371a8e4a 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,5 +1,7 @@ from .settings import settings -from .agent import ai_task, Agent, run_agent +from .agent import ai_task, Agent, run_ai_task from .flow import ai_flow from .instructions import instructions + +from marvin.beta.assistants import Assistant diff --git a/src/control_flow/agent.py b/src/control_flow/agent.py index f8d5c1a8..39ee2088 100644 --- a/src/control_flow/agent.py +++ b/src/control_flow/agent.py @@ -2,19 +2,20 @@ import inspect import json import logging +from datetime import datetime from typing import Callable, Generic, TypeVar, Union +from zoneinfo import ZoneInfo import marvin import marvin.utilities.tools import prefect -from marvin.beta.assistants import Thread from marvin.beta.assistants.assistants import Assistant from marvin.beta.assistants.handlers import PrintHandler from marvin.beta.assistants.runs import Run from marvin.tools.assistants import AssistantTool, EndRun from marvin.types import FunctionTool from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from marvin.utilities.jinja import Environment +from marvin.utilities.jinja import BaseEnvironment from openai.types.beta.threads.runs import ToolCall from prefect import get_client as get_prefect_client from prefect import task as prefect_task @@ -23,6 +24,7 @@ from control_flow import settings from control_flow.context import ctx +from control_flow.delegation import DelegationStrategy, RoundRobin from control_flow.utilities.prefect import ( create_json_artifact, create_markdown_artifact, @@ -36,7 +38,14 @@ logger = logging.getLogger(__name__) NOT_PROVIDED = object() -TEMP_THREADS = {} + +jinja_environment = BaseEnvironment( + globals={ + "now": lambda: datetime.now(ZoneInfo("UTC")), + "inspect": inspect, + "id": id, + } +) TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( @@ -61,10 +70,11 @@ INSTRUCTIONS = """ -You are an AI assistant. Your job is to complete the tasks assigned to you. You -were created by a software application, and any messages you receive are from -that software application, not a user. You may use any tools at your disposal to -complete the task, including talking to a human user. +You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to +complete the tasks assigned to you. You were created by a software application, +and any messages you receive are from that software application, not a user. You +may use any tools at your disposal to complete the task, including talking to a +human user. ## Instructions @@ -85,6 +95,21 @@ {% endfor %} +## Assistants + +The following assistants are collaborating to complete the tasks. + +{% for a in agent.assistants %} +- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} +{% endfor %} + +{% if agent.assistants|length > 1 %} +Use the `end_run` tool to end your turn and allow another assistant to take +over, without marking a task as complete or failed. +{% endif %} + + + ## Tasks {% if agent.tasks %} @@ -130,7 +155,10 @@ {% if agent.system_access -%} The software that created you is an AI capable of processing natural language, -so you can freely respond by posting messages to the thread. +so you can freely respond by posting messages to the thread. There may be more +than one AI posting responses, so preface your messages with your own name to +avoid confusion. For example, if your name is "Marvin", start all of your +messages with "Marvin:". This is not required for tool calls. {% else %} The software that created you is a Python script that can only process structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE @@ -161,76 +189,6 @@ """ -class AgentHandler(PrintHandler): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.tool_calls = {} - - async def on_tool_call_created(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is created""" - - if tool_call.type == "function": - task_run_name = "Prepare arguments for tool call" - else: - task_run_name = f"Tool call: {tool_call.type}" - - client = get_prefect_client() - engine_context = FlowRunContext.get() - if not engine_context: - return - - task_run = await client.create_task_run( - task=prefect.Task(fn=lambda: None), - name=task_run_name, - extra_tags=["tool-call"], - flow_run_id=engine_context.flow_run.id, - dynamic_key=tool_call.id, - state=prefect.states.Running(), - ) - - self.tool_calls[tool_call.id] = task_run - - async def on_tool_call_done(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is done""" - - client = get_prefect_client() - task_run = self.tool_calls.get(tool_call.id) - if not task_run: - return - await client.set_task_run_state( - task_run_id=task_run.id, state=prefect.states.Completed(), force=True - ) - - # code interpreter is run as a single call, so we can publish a result artifact - if tool_call.type == "code_interpreter": - # images = [] - # for output in tool_call.code_interpreter.outputs: - # if output.type == "image": - # image_path = download_temp_file(output.image.file_id) - # images.append(image_path) - - create_python_artifact( - key="code", - code=tool_call.code_interpreter.input, - description="Code executed in the code interpreter", - task_run_id=task_run.id, - ) - create_json_artifact( - key="output", - data=tool_call.code_interpreter.outputs, - description="Output from the code interpreter", - task_run_id=task_run.id, - ) - - elif tool_call.type == "function": - create_json_artifact( - key="arguments", - data=json.dumps(json.loads(tool_call.function.arguments), indent=2), - description=f"Arguments for the `{tool_call.function.name}` tool", - task_run_id=task_run.id, - ) - - def talk_to_human(message: str, get_response: bool = True) -> str: """ Send a message to the human user and optionally wait for a response. @@ -245,14 +203,19 @@ def talk_to_human(message: str, get_response: bool = True) -> str: def end_run(): - """Use this tool to end the run.""" + """Use this tool to end the run without marking a task as complete or failed.""" return EndRun() class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): tasks: list[AITask] = [] flow: AIFlow = Field(None, validate_default=True) - assistant: Assistant = Field(None, validate_default=True) + assistants: list = Field(None, validate_default=True) + delegation_strategy: DelegationStrategy = Field( + validate_default=True, + description="The strategy for delegating work to assistants.", + default_factory=RoundRobin, + ) tools: list[Union[AssistantTool, Assistant, Callable]] = [] context: dict = Field(None, validate_default=True) user_access: bool = Field( @@ -284,14 +247,14 @@ def _default_context(cls, v): v = {} return v - @field_validator("assistant", mode="before") - def _default_assistant(cls, v): + @field_validator("assistants", mode="before") + def _default_assistants(cls, v): if v is None: flow = ctx.get("flow") if flow: - v = flow.assistant + v = flow.assistants if v is None: - v = Assistant() + v = [Assistant()] return v @field_validator("user_access", "system_access", mode="before") @@ -303,22 +266,21 @@ def _default_access(cls, v): def numbered_tasks(self) -> list[tuple[int, AITask]]: return [(i + 1, task) for i, task in enumerate(self.tasks)] - def _get_instructions(self, context: dict = None): - instructions = Environment.render( + def _get_instructions(self, assistant: Assistant, context: dict = None): + instructions = jinja_environment.render( INSTRUCTIONS, agent=self, flow=self.flow, - assistant=self.assistant, + assistant=assistant, instructions=ctx.get("instructions", []), context={**self.context, **(context or {})}, ) - return instructions - def _get_tools(self) -> list[AssistantTool]: - tools = self.flow.tools + self.tools + self.assistant.tools + def _get_tools(self, assistant: Assistant) -> list[AssistantTool]: + tools = self.flow.tools + self.tools + assistant.tools - if not self.tasks: + if not self.tasks or len(self.assistants) > 1: tools.append(end_run) # if there is only one task, and the agent can't send a response to the @@ -341,9 +303,7 @@ def _get_tools(self) -> list[AssistantTool]: final_tools = [] for tool in tools: - if isinstance(tool, marvin.beta.assistants.Assistant): - tool = self.model_copy(update={"assistant": tool}).as_tool() - elif not isinstance(tool, AssistantTool): + if not isinstance(tool, AssistantTool): tool = marvin.utilities.tools.tool_from_function(tool) if isinstance(tool, FunctionTool): @@ -383,28 +343,37 @@ async def modified_fn( final_tools.append(tool) return final_tools - def _get_openai_run_task(self): + def _get_run_assistant_task(self): """ Helper function for building the task that will execute the OpenAI assistant run. This needs to be regenerated each time in case the instructions change. """ - @prefect_task(task_run_name=f"Run OpenAI assistant ({self.assistant.name})") - async def execute_openai_run( - context: dict = None, run_kwargs: dict = None + def _name_from_assistant(): + """Helper function for naming task runs""" + from prefect.runtime import task_run + + assistant = task_run.parameters.get("assistant") + return f"Run OpenAI assistant ({assistant.name})" + + @prefect_task(task_run_name=_name_from_assistant) + async def run_openai_assistant( + assistant: Assistant, context: dict = None, run_kwargs: dict = None ) -> Run: run_kwargs = run_kwargs or {} model = run_kwargs.pop( "model", - self.assistant.model or self.flow.model or settings.assistant_model, + assistant.model or self.flow.model or settings.assistant_model, ) thread = run_kwargs.pop("thread", self.flow.thread) run = Run( - assistant=self.assistant, + assistant=assistant, thread=thread, - instructions=self._get_instructions(context=context), - tools=self._get_tools(), + instructions=self._get_instructions( + assistant=assistant, context=context + ), + tools=self._get_tools(assistant=assistant), event_handler_class=AgentHandler, model=model, **run_kwargs, @@ -424,13 +393,18 @@ async def execute_openai_run( ) return run - return execute_openai_run + return run_openai_assistant @expose_sync_method("run") async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: - openai_run = self._get_openai_run_task() + assistants_generator = self.delegation_strategy.run(self.assistants) + openai_run = self._get_run_assistant_task() - openai_run(context=context, run_kwargs=run_kwargs) + openai_run( + assistant=next(assistants_generator), + context=context, + run_kwargs=run_kwargs, + ) # if this AI can't post messages to the system, then continue to invoke # it until all tasks are finished @@ -440,49 +414,94 @@ async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: any(t.status == TaskStatus.PENDING for t in self.tasks) and counter < settings.max_agent_iterations ): - openai_run(context=context, run_kwargs=run_kwargs) + openai_run( + assistant=next(assistants_generator), + context=context, + run_kwargs=run_kwargs, + ) counter += 1 result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] return result - def as_tool(self): - thread = TEMP_THREADS.setdefault(self.assistant.model_dump_json(), Thread()) - - def _run(message: str, context: dict = None) -> list[str]: - task = self._get_openai_run_task() - run: Run = task(context=context, run_kwargs=dict(thread=thread)) - return [m.model_dump_json() for m in run.messages] - - return marvin.utilities.tools.tool_from_function( - _run, - name=f"call_ai_{self.assistant.name}", - description=inspect.cleandoc(""" - Use this tool to talk to a sub-AI that can operate independently of - you. The sub-AI may have a different skillset or be able to access - different tools than you. The sub-AI will run one iteration and - respond to you. You may continue to invoke it multiple times in sequence, as - needed. - - Note: you can only talk to one sub-AI at a time. Do not call in parallel or you will get an error about thread conflicts. - - ## Sub-AI Details - - - Name: {name} - - Instructions: {instructions} - """).format( - name=self.assistant.name, instructions=self.assistant.instructions - ), + +class AgentHandler(PrintHandler): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.tool_calls = {} + + async def on_tool_call_created(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is created""" + + if tool_call.type == "function": + task_run_name = "Prepare arguments for tool call" + else: + task_run_name = f"Tool call: {tool_call.type}" + + client = get_prefect_client() + engine_context = FlowRunContext.get() + if not engine_context: + return + + task_run = await client.create_task_run( + task=prefect.Task(fn=lambda: None), + name=task_run_name, + extra_tags=["tool-call"], + flow_run_id=engine_context.flow_run.id, + dynamic_key=tool_call.id, + state=prefect.states.Running(), ) + self.tool_calls[tool_call.id] = task_run + + async def on_tool_call_done(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is done""" + + client = get_prefect_client() + task_run = self.tool_calls.get(tool_call.id) + if not task_run: + return + await client.set_task_run_state( + task_run_id=task_run.id, state=prefect.states.Completed(), force=True + ) + + # code interpreter is run as a single call, so we can publish a result artifact + if tool_call.type == "code_interpreter": + # images = [] + # for output in tool_call.code_interpreter.outputs: + # if output.type == "image": + # image_path = download_temp_file(output.image.file_id) + # images.append(image_path) + + create_python_artifact( + key="code", + code=tool_call.code_interpreter.input, + description="Code executed in the code interpreter", + task_run_id=task_run.id, + ) + create_json_artifact( + key="output", + data=tool_call.code_interpreter.outputs, + description="Output from the code interpreter", + task_run_id=task_run.id, + ) + + elif tool_call.type == "function": + create_json_artifact( + key="arguments", + data=json.dumps(json.loads(tool_call.function.arguments), indent=2), + description=f"Arguments for the `{tool_call.function.name}` tool", + task_run_id=task_run.id, + ) + def ai_task( fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict ): """ - Decorator that uses a function to create an AI task. When the function is - called, an agent is created to complete the task and return the result. + Use a Python function to create an AI task. When the function is called, an + agent is created to complete the task and return the result. """ if fn is None: @@ -504,7 +523,7 @@ def wrapper(*args, **kwargs): bound = sig.bind(*args, **kwargs) bound.apply_defaults() - return run_agent.with_options(name=f"Task: {fn.__name__}")( + return run_ai_task.with_options(name=f"Task: {fn.__name__}")( task=objective, cast=fn.__annotations__.get("return"), context=bound.arguments, @@ -529,7 +548,7 @@ def _name_from_objective(): @prefect_task(task_run_name=_name_from_objective) -def run_agent( +def run_ai_task( task: str = None, cast: T = NOT_PROVIDED, context: dict = None, @@ -538,8 +557,14 @@ def run_agent( **agent_kwargs: dict, ) -> T: """ - Run an agent to complete a task with the given objective and context. The - response will be cast to the given result type. + Create and run an agent to complete a task with the given objective and + context. This function is similar to an inline version of the @ai_task + decorator. + + This inline version is useful when you want to create and run an ad-hoc AI + task, without defining a function or using decorator syntax. It provides + more flexibility in terms of dynamically setting the task parameters. + Additional detail can be provided as `context`. """ if cast is NOT_PROVIDED: diff --git a/src/control_flow/delegation.py b/src/control_flow/delegation.py new file mode 100644 index 00000000..86c2a7bb --- /dev/null +++ b/src/control_flow/delegation.py @@ -0,0 +1,86 @@ +import itertools +from typing import Any, Generator + +import marvin +from marvin.beta.assistants import Assistant +from pydantic import BaseModel + +from control_flow.flow import get_flow_messages +from control_flow.instructions import get_instructions + + +class DelegationStrategy(BaseModel): + """ + A DelegationStrategy is a strategy for delegating tasks to AI assistants. + """ + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, delegate the task to the most qualified assistant. + """ + + raise NotImplementedError() + + +class Single(DelegationStrategy): + """ + A Single delegation strategy delegates tasks to a single AI assistant. + """ + + assistant: Assistant + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, choose the first assistant in the list. + """ + + if self.assistant not in assistants: + raise ValueError("Assistant not in list of assistants") + + while True: + yield self.assistant + + +class RoundRobin(DelegationStrategy): + """ + A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. + """ + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, choose the next assistant in the list. + """ + + yield from itertools.cycle(assistants) + + +class Moderator(DelegationStrategy): + """ + A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier + """ + + model: str = None + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, delegate the task to the most qualified assistant. + """ + + while True: + instructions = get_instructions() + history = get_flow_messages() + + context = dict(messages=history, global_instructions=instructions) + assistant = marvin.classify( + context, + assistants, + instructions=""" + Given the conversation context, choose the AI assistant most + qualified to take the next turn at completing the tasks. Take into + account the instructions, each assistant's own instructions, and the + tools they have available. + """, + model_kwargs=dict(model=self.model), + ) + + yield assistant diff --git a/src/control_flow/flow.py b/src/control_flow/flow.py index 24dfcb81..e5184d82 100644 --- a/src/control_flow/flow.py +++ b/src/control_flow/flow.py @@ -17,19 +17,13 @@ class AIFlow(BaseModel): thread: Thread = Field(None, validate_default=True) - assistant: Optional[Assistant] = Field(None, validate_default=True) + assistants: Optional[list[Assistant]] = Field(None, validate_default=True) tools: list[Union[AssistantTool, Callable]] = Field(None, validate_default=True) instructions: Optional[str] = None model: Optional[str] = None model_config: dict = dict(validate_assignment=True, extra="forbid") - @field_validator("assistant", mode="before") - def _load_assistant_from_ctx(cls, v): - if v is None: - v = ctx.get("assistant", None) - return v - @field_validator("thread", mode="before") def _load_thread_from_ctx(cls, v): if v is None: @@ -54,7 +48,7 @@ def add_message(self, message: str): def ai_flow( fn=None, *, - assistant: Assistant = None, + assistants: list[Assistant] = None, thread: Thread = None, tools: list[Union[AssistantTool, Callable]] = None, instructions: str = None, @@ -67,7 +61,7 @@ def ai_flow( if fn is None: return functools.partial( ai_flow, - assistant=assistant, + assistants=assistants, thread=thread, tools=tools, instructions=instructions, @@ -77,7 +71,7 @@ def ai_flow( @functools.wraps(fn) def wrapper( *args, - _assistant: Assistant = None, + _assistants: list[Assistant] = None, _thread: Thread = None, _tools: list[Union[AssistantTool, Callable]] = None, _instructions: str = None, @@ -85,19 +79,14 @@ def wrapper( **kwargs, ): p_fn = prefect_flow(fn) - flow_assistant = _assistant or assistant - flow_thread = ( - _thread - or thread - or (flow_assistant.default_thread if flow_assistant else None) - or Thread() - ) + flow_assistants = _assistants or assistants + flow_thread = _thread or thread or Thread() flow_instructions = _instructions or instructions flow_tools = _tools or tools flow_model = _model or model flow_obj = AIFlow( thread=flow_thread, - assistant=flow_assistant, + assistants=flow_assistants, tools=flow_tools, instructions=flow_instructions, model=flow_model, @@ -113,13 +102,23 @@ def wrapper( return wrapper -def get_messages(limit: int = None) -> list[Message]: +def get_flow() -> AIFlow: """ - Loads messages from the flow's thread. + Loads the flow from the context. Will error if no flow is found in the context. """ flow: Optional[AIFlow] = ctx.get("flow") if not flow: raise ValueError("No flow found in context") + return flow + + +def get_flow_messages(limit: int = None) -> list[Message]: + """ + Loads messages from the flow's thread. + + Will error if no flow is found in the context. + """ + flow = get_flow() return flow.thread.get_messages(limit=limit) diff --git a/src/control_flow/settings.py b/src/control_flow/settings.py index 9e46653b..8b8af737 100644 --- a/src/control_flow/settings.py +++ b/src/control_flow/settings.py @@ -1,5 +1,6 @@ import os +from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict @@ -17,10 +18,29 @@ class ControlFlowSettings(BaseSettings): ) +class PrefectSettings(ControlFlowSettings): + """ + All settings here are used as defaults for Prefect, unless overridden by env vars. + """ + + PREFECT_LOGGING_LEVEL: str = "WARNING" + + def apply(self): + import os + + for k, v in self.model_dump().items(): + if k not in os.environ: + os.environ[k] = v + + class Settings(ControlFlowSettings): assistant_model: str = "gpt-4-1106-preview" max_agent_iterations: int = 10 - use_prefect: bool = True + prefect: PrefectSettings = Field(default_factory=PrefectSettings) + + def __init__(self, **data): + super().__init__(**data) + self.prefect.apply() settings = Settings() diff --git a/tests/flows/test_sign_guestbook.py b/tests/flows/test_sign_guestbook.py new file mode 100644 index 00000000..b3d816cf --- /dev/null +++ b/tests/flows/test_sign_guestbook.py @@ -0,0 +1,47 @@ +from control_flow import Assistant, run_ai_task +from control_flow.flow import ai_flow + +# define assistants + +a = Assistant(name="a") +b = Assistant(name="b") +c = Assistant(name="c") + + +# define tools + +GUESTBOOK = [] + + +def sign(name): + """sign your name in the guestbook""" + GUESTBOOK.append(name) + + +def view_guestbook(): + """view the guestbook""" + return GUESTBOOK + + +# define flow + + +@ai_flow +def guestbook_flow(): + run_ai_task( + """ + Add your name to the list using the `sign` tool. All assistants must + sign their names for the task to be complete. You can read the sign to + see if that has happened yet. You can not sign for another assistant. + """, + assistants=[a, b, c], + tools=[sign, view_guestbook], + ) + + +# run test + + +def test(): + guestbook_flow() + assert GUESTBOOK == ["a", "b", "c"]