From 1bf5f6d52ddaa8c1b57b210582bfba3dcbbd612d Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 9 Jul 2024 17:10:19 +0000 Subject: [PATCH] [autofix.ci] apply automated fixes --- .../custom/custom_component/component.py | 4 +- src/backend/base/langflow/graph/graph/base.py | 1 - .../base/langflow/services/tracing/base.py | 12 ++- .../base/langflow/services/tracing/service.py | 88 +++++-------------- 4 files changed, 33 insertions(+), 72 deletions(-) diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index c25c0763fe5..82947398279 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -120,9 +120,7 @@ async def build_results(self): metadata = self.get_trace_as_metadata() trace_name = f"{self.display_name} ({self.vertex.id})" - async with self._tracing_service.trace_context( - self, trace_name, inputs, metadata - ): + async with self._tracing_service.trace_context(self, trace_name, inputs, metadata): _results, _artifacts = await self._build_results() self._tracing_service.set_outputs(trace_name, _results) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index cd69955bf53..b8e95652815 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1,5 +1,4 @@ import asyncio -import traceback import uuid from collections import defaultdict, deque from datetime import datetime, timezone diff --git a/src/backend/base/langflow/services/tracing/base.py b/src/backend/base/langflow/services/tracing/base.py index b42d10af603..b3648f752c5 100644 --- a/src/backend/base/langflow/services/tracing/base.py +++ b/src/backend/base/langflow/services/tracing/base.py @@ -17,12 +17,20 @@ def ready(self): @abstractmethod def add_trace( - self, trace_id: str, trace_name: str, trace_type: str, inputs: Dict[str, Any], metadata: Dict[str, Any] | None = None, vertex: Optional["Vertex"] = None + self, + trace_id: str, + trace_name: str, + trace_type: str, + inputs: Dict[str, Any], + metadata: Dict[str, Any] | None = None, + vertex: Optional["Vertex"] = None, ): raise NotImplementedError @abstractmethod - def end_trace(self, trace_id: str, trace_name: str, outputs: Dict[str, Any] | None = None, error: Exception | None = None): + def end_trace( + self, trace_id: str, trace_name: str, outputs: Dict[str, Any] | None = None, error: Exception | None = None + ): raise NotImplementedError @abstractmethod diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 763f2f81e3f..24e6248c512 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -27,9 +27,7 @@ class TracingService(Service): name = "tracing_service" - def __init__( - self, settings_service: "SettingsService", monitor_service: "MonitorService" - ): + def __init__(self, settings_service: "SettingsService", monitor_service: "MonitorService"): self.settings_service = settings_service self.monitor_service = monitor_service self.inputs: dict[str, dict] = defaultdict(dict) @@ -142,15 +140,11 @@ def _start_traces( if not tracer.ready: continue try: - tracer.add_trace( - trace_id, trace_name, trace_type, inputs, metadata, vertex - ) + tracer.add_trace(trace_id, trace_name, trace_type, inputs, metadata, vertex) except Exception as e: logger.error(f"Error starting trace {trace_name}: {e}") - def _end_traces( - self, trace_id: str, trace_name: str, error: Exception | None = None - ): + def _end_traces(self, trace_id: str, trace_name: str, error: Exception | None = None): for tracer in self._tracers.values(): if not tracer.ready: continue @@ -169,9 +163,7 @@ def _end_all_traces(self, outputs: dict, error: Exception | None = None): if not tracer.ready: continue try: - tracer.end( - self.inputs, outputs=self.outputs, error=error, metadata=outputs - ) + tracer.end(self.inputs, outputs=self.outputs, error=error, metadata=outputs) except Exception as e: logger.error(f"Error ending all traces: {e}") @@ -240,9 +232,7 @@ def _cleanup_inputs(self, inputs: Dict[str, Any]): class LangSmithTracer(BaseTracer): - def __init__( - self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID - ): + def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): from langsmith.run_trees import RunTree self.trace_name = trace_name @@ -256,9 +246,7 @@ def __init__( run_type=self.trace_type, id=self.trace_id, ) - self._run_tree.add_event( - {"name": "Start", "time": datetime.now(timezone.utc).isoformat()} - ) + self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()}) self._children: dict[str, RunTree] = {} self._ready = self.setup_langsmith() except Exception as e: @@ -275,9 +263,7 @@ def setup_langsmith(self): self._client = Client() except ImportError: - logger.error( - "Could not import langsmith. Please install it with `pip install langsmith`." - ) + logger.error("Could not import langsmith. Please install it with `pip install langsmith`.") return False os.environ["LANGCHAIN_TRACING_V2"] = "true" return True @@ -360,14 +346,10 @@ def _error_to_string(self, error: Optional[Exception]): error_message = None if error: try: # python < 3.10 - string_stacktrace = traceback.format_exception( - etype=type(error), value=error, tb=error.__traceback__ - ) # type: ignore + string_stacktrace = traceback.format_exception(etype=type(error), value=error, tb=error.__traceback__) # type: ignore except: # python 3.10+ string_stacktrace = traceback.format_exception(err) # type: ignore - error_message = ( - f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}" - ) + error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}" return error_message def add_log(self, trace_name: str, log: Log): @@ -393,9 +375,7 @@ def end( class LangWatchTracer(BaseTracer): - def __init__( - self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID - ): + def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): self.trace_name = trace_name self.trace_type = trace_type self.project_name = project_name @@ -432,17 +412,14 @@ def setup_langwatch(self): self._client = langwatch except ImportError: - logger.error( - "Could not import langwatch. Please install it with `pip install langwatch`." - ) + logger.error("Could not import langwatch. Please install it with `pip install langwatch`.") return False return True def _convert_trace_type(self, trace_type: str): trace_type_: "SpanTypes" = ( cast("SpanTypes", trace_type) - if trace_type - in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"] + if trace_type in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"] else "span" ) return trace_type_ @@ -460,15 +437,8 @@ def add_trace( # If user is not using session_id, then it becomes the same as flow_id, but # we don't want to have an infinite thread with all the flow messages - if ( - "session_id" in inputs - and "flow_id" in inputs - and inputs["flow_id"] != inputs["session_id"] - ): - self.trace.update( - metadata=(self.trace.metadata or {}) - | {"thread_id": inputs["session_id"]} - ) + if "session_id" in inputs and "flow_id" in inputs and inputs["flow_id"] != inputs["session_id"]: + self.trace.update(metadata=(self.trace.metadata or {}) | {"thread_id": inputs["session_id"]}) name_without_id = " (".join(trace_name.split(" (")[0:-1]) @@ -478,12 +448,9 @@ def add_trace( name=name_without_id, type=trace_type_, parent=( - [ - span - for key, span in self.spans.items() - for edge in vertex.incoming_edges - if key == edge.source_id - ][-1] + [span for key, span in self.spans.items() for edge in vertex.incoming_edges if key == edge.source_id][ + -1 + ] if vertex and len(vertex.incoming_edges) > 0 else self.trace.root_span ), @@ -509,13 +476,9 @@ def end_trace( and "model_output" in outputs and "text_output" not in outputs ): - self.spans[trace_id].update( - metrics={"prompt_tokens": 0, "completion_tokens": 0} - ) + self.spans[trace_id].update(metrics={"prompt_tokens": 0, "completion_tokens": 0}) - self.spans[trace_id].end( - output=self._convert_to_langwatch_types(outputs), error=error - ) + self.spans[trace_id].end(output=self._convert_to_langwatch_types(outputs), error=error) def end( self, @@ -531,10 +494,7 @@ def end( ) if metadata and "flow_name" in metadata: - self.trace.update( - metadata=(self.trace.metadata or {}) - | {"labels": [f"Flow: {metadata['flow_name']}"]} - ) + self.trace.update(metadata=(self.trace.metadata or {}) | {"labels": [f"Flow: {metadata['flow_name']}"]}) self.trace.deferred_send_spans() def _convert_to_langwatch_types(self, io_dict: Optional[Dict[str, Any]]): @@ -563,12 +523,8 @@ def _convert_to_langwatch_type(self, value): elif isinstance(value, Message): if "prompt" in value: prompt = value.load_lc_prompt() - if len(prompt.input_variables) == 0 and all( - isinstance(m, BaseMessage) for m in prompt.messages - ): - value = langchain_messages_to_chat_messages( - [cast(list[BaseMessage], prompt.messages)] - ) + if len(prompt.input_variables) == 0 and all(isinstance(m, BaseMessage) for m in prompt.messages): + value = langchain_messages_to_chat_messages([cast(list[BaseMessage], prompt.messages)]) else: value = cast(dict, value.load_lc_prompt()) elif value.sender: