Skip to content

Commit

Permalink
[autofix.ci] apply automated fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
autofix-ci[bot] authored Jul 9, 2024
1 parent 9e56e68 commit 1bf5f6d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ async def build_results(self):
metadata = self.get_trace_as_metadata()

trace_name = f"{self.display_name} ({self.vertex.id})"
async with self._tracing_service.trace_context(
self, trace_name, inputs, metadata
):
async with self._tracing_service.trace_context(self, trace_name, inputs, metadata):
_results, _artifacts = await self._build_results()
self._tracing_service.set_outputs(trace_name, _results)

Expand Down
1 change: 0 additions & 1 deletion src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import traceback
import uuid
from collections import defaultdict, deque
from datetime import datetime, timezone
Expand Down
12 changes: 10 additions & 2 deletions src/backend/base/langflow/services/tracing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ def ready(self):

@abstractmethod
def add_trace(
self, trace_id: str, trace_name: str, trace_type: str, inputs: Dict[str, Any], metadata: Dict[str, Any] | None = None, vertex: Optional["Vertex"] = None
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
raise NotImplementedError

@abstractmethod
def end_trace(self, trace_id: str, trace_name: str, outputs: Dict[str, Any] | None = None, error: Exception | None = None):
def end_trace(
self, trace_id: str, trace_name: str, outputs: Dict[str, Any] | None = None, error: Exception | None = None
):
raise NotImplementedError

@abstractmethod
Expand Down
88 changes: 22 additions & 66 deletions src/backend/base/langflow/services/tracing/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
class TracingService(Service):
name = "tracing_service"

def __init__(
self, settings_service: "SettingsService", monitor_service: "MonitorService"
):
def __init__(self, settings_service: "SettingsService", monitor_service: "MonitorService"):
self.settings_service = settings_service
self.monitor_service = monitor_service
self.inputs: dict[str, dict] = defaultdict(dict)
Expand Down Expand Up @@ -142,15 +140,11 @@ def _start_traces(
if not tracer.ready:
continue
try:
tracer.add_trace(
trace_id, trace_name, trace_type, inputs, metadata, vertex
)
tracer.add_trace(trace_id, trace_name, trace_type, inputs, metadata, vertex)
except Exception as e:
logger.error(f"Error starting trace {trace_name}: {e}")

def _end_traces(
self, trace_id: str, trace_name: str, error: Exception | None = None
):
def _end_traces(self, trace_id: str, trace_name: str, error: Exception | None = None):
for tracer in self._tracers.values():
if not tracer.ready:
continue
Expand All @@ -169,9 +163,7 @@ def _end_all_traces(self, outputs: dict, error: Exception | None = None):
if not tracer.ready:
continue
try:
tracer.end(
self.inputs, outputs=self.outputs, error=error, metadata=outputs
)
tracer.end(self.inputs, outputs=self.outputs, error=error, metadata=outputs)
except Exception as e:
logger.error(f"Error ending all traces: {e}")

Expand Down Expand Up @@ -240,9 +232,7 @@ def _cleanup_inputs(self, inputs: Dict[str, Any]):


class LangSmithTracer(BaseTracer):
def __init__(
self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID
):
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
from langsmith.run_trees import RunTree

self.trace_name = trace_name
Expand All @@ -256,9 +246,7 @@ def __init__(
run_type=self.trace_type,
id=self.trace_id,
)
self._run_tree.add_event(
{"name": "Start", "time": datetime.now(timezone.utc).isoformat()}
)
self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()})
self._children: dict[str, RunTree] = {}
self._ready = self.setup_langsmith()
except Exception as e:
Expand All @@ -275,9 +263,7 @@ def setup_langsmith(self):

self._client = Client()
except ImportError:
logger.error(
"Could not import langsmith. Please install it with `pip install langsmith`."
)
logger.error("Could not import langsmith. Please install it with `pip install langsmith`.")
return False
os.environ["LANGCHAIN_TRACING_V2"] = "true"
return True
Expand Down Expand Up @@ -360,14 +346,10 @@ def _error_to_string(self, error: Optional[Exception]):
error_message = None
if error:
try: # python < 3.10
string_stacktrace = traceback.format_exception(
etype=type(error), value=error, tb=error.__traceback__
) # type: ignore
string_stacktrace = traceback.format_exception(etype=type(error), value=error, tb=error.__traceback__) # type: ignore
except: # python 3.10+

Check failure on line 350 in src/backend/base/langflow/services/tracing/service.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (E722)

src/backend/base/langflow/services/tracing/service.py:350:13: E722 Do not use bare `except`
string_stacktrace = traceback.format_exception(err) # type: ignore

Check failure on line 351 in src/backend/base/langflow/services/tracing/service.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (F821)

src/backend/base/langflow/services/tracing/service.py:351:64: F821 Undefined name `err`
error_message = (
f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}"
)
error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}"
return error_message

def add_log(self, trace_name: str, log: Log):
Expand All @@ -393,9 +375,7 @@ def end(


class LangWatchTracer(BaseTracer):
def __init__(
self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID
):
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
Expand Down Expand Up @@ -432,17 +412,14 @@ def setup_langwatch(self):

self._client = langwatch
except ImportError:
logger.error(
"Could not import langwatch. Please install it with `pip install langwatch`."
)
logger.error("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
return True

def _convert_trace_type(self, trace_type: str):
trace_type_: "SpanTypes" = (
cast("SpanTypes", trace_type)
if trace_type
in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"]
if trace_type in ["span", "llm", "chain", "tool", "agent", "guardrail", "rag"]
else "span"
)
return trace_type_
Expand All @@ -460,15 +437,8 @@ def add_trace(

# If user is not using session_id, then it becomes the same as flow_id, but
# we don't want to have an infinite thread with all the flow messages
if (
"session_id" in inputs
and "flow_id" in inputs
and inputs["flow_id"] != inputs["session_id"]
):
self.trace.update(
metadata=(self.trace.metadata or {})
| {"thread_id": inputs["session_id"]}
)
if "session_id" in inputs and "flow_id" in inputs and inputs["flow_id"] != inputs["session_id"]:
self.trace.update(metadata=(self.trace.metadata or {}) | {"thread_id": inputs["session_id"]})

name_without_id = " (".join(trace_name.split(" (")[0:-1])

Expand All @@ -478,12 +448,9 @@ def add_trace(
name=name_without_id,
type=trace_type_,
parent=(
[
span
for key, span in self.spans.items()
for edge in vertex.incoming_edges
if key == edge.source_id
][-1]
[span for key, span in self.spans.items() for edge in vertex.incoming_edges if key == edge.source_id][
-1
]
if vertex and len(vertex.incoming_edges) > 0
else self.trace.root_span
),
Expand All @@ -509,13 +476,9 @@ def end_trace(
and "model_output" in outputs
and "text_output" not in outputs
):
self.spans[trace_id].update(
metrics={"prompt_tokens": 0, "completion_tokens": 0}
)
self.spans[trace_id].update(metrics={"prompt_tokens": 0, "completion_tokens": 0})

self.spans[trace_id].end(
output=self._convert_to_langwatch_types(outputs), error=error
)
self.spans[trace_id].end(output=self._convert_to_langwatch_types(outputs), error=error)

def end(
self,
Expand All @@ -531,10 +494,7 @@ def end(
)

if metadata and "flow_name" in metadata:
self.trace.update(
metadata=(self.trace.metadata or {})
| {"labels": [f"Flow: {metadata['flow_name']}"]}
)
self.trace.update(metadata=(self.trace.metadata or {}) | {"labels": [f"Flow: {metadata['flow_name']}"]})
self.trace.deferred_send_spans()

def _convert_to_langwatch_types(self, io_dict: Optional[Dict[str, Any]]):
Expand Down Expand Up @@ -563,12 +523,8 @@ def _convert_to_langwatch_type(self, value):
elif isinstance(value, Message):
if "prompt" in value:
prompt = value.load_lc_prompt()
if len(prompt.input_variables) == 0 and all(
isinstance(m, BaseMessage) for m in prompt.messages
):
value = langchain_messages_to_chat_messages(
[cast(list[BaseMessage], prompt.messages)]
)
if len(prompt.input_variables) == 0 and all(isinstance(m, BaseMessage) for m in prompt.messages):
value = langchain_messages_to_chat_messages([cast(list[BaseMessage], prompt.messages)])
else:
value = cast(dict, value.load_lc_prompt())
elif value.sender:
Expand Down

0 comments on commit 1bf5f6d

Please sign in to comment.