Skip to content

Commit

Permalink
feat: implement LangfuseTracer (langflow-ai#3398)
Browse files Browse the repository at this point in the history
* feat: implement langfuse tracer class for event monitoring

* feat: integrate langfuse trace class into trace service
  • Loading branch information
italojohnny authored Aug 26, 2024
1 parent ee6fc45 commit b5fc559
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
137 changes: 137 additions & 0 deletions src/backend/base/langflow/services/tracing/langfuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from datetime import datetime

from loguru import logger

from langflow.services.tracing.base import BaseTracer
from langflow.services.tracing.schema import Log

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
from langchain.callbacks.base import BaseCallbackHandler


class LangFuseTracer(BaseTracer):
flow_id: str

def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.project_name = project_name
self.trace_name = trace_name
self.trace_type = trace_type
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
self.last_span = None
self.spans: dict = {}
self._ready: bool = self.setup_langfuse()

@property
def ready(self):
return self._ready

def setup_langfuse(self) -> bool:
try:
from langfuse import Langfuse
from langfuse.callback.langchain import LangchainCallbackHandler

config = self._get_config()
if not all(config.values()):
raise ValueError("Missing Langfuse configuration")

self._client = Langfuse(**config)
self.trace = self._client.trace(id=str(self.trace_id), name=self.flow_id)

config |= {
"trace_name": self.flow_id,
"stateful_client": self.trace,
"update_stateful_client": True,
}
self._callback = LangchainCallbackHandler(**config)

except ImportError:
logger.error("Could not import langfuse. Please install it with `pip install langfuse`.")
return False

except Exception as e:
logger.debug(f"Error setting up LangSmith tracer: {e}")
return False

return True

def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: Dict[str, Any],
metadata: Dict[str, Any] | None = None,
vertex: Optional["Vertex"] = None,
):
start_time = datetime.utcnow()
if not self._ready:
return

_metadata: dict = {}
_metadata |= {"trace_type": trace_type} if trace_type else {}
_metadata |= metadata if metadata else {}

_name = trace_name.removesuffix(f" ({trace_id})")
content_span = {
"name": _name,
"input": inputs,
"metadata": _metadata,
"start_time": start_time,
}

if self.last_span:
span = self.last_span.span(**content_span)
else:
span = self.trace.span(**content_span)

self.last_span = span
self.spans[trace_id] = span

def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: Dict[str, Any] | None = None,
error: Exception | None = None,
logs: list[Log | dict] = [],
):
end_time = datetime.utcnow()
if not self._ready:
return

span = self.spans.get(trace_id, None)
if span:
_output: dict = {}
_output |= outputs if outputs else {}
_output |= {"error": str(error)} if error else {}
_output |= {"logs": logs} if logs else {}
content = {"output": _output, "end_time": end_time}
span.update(**content)

def end(
self,
inputs: dict[str, Any],
outputs: Dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
):
if not self._ready:
return

self._client.flush()

def get_langchain_callback(self) -> Optional["BaseCallbackHandler"]:
if not self._ready:
return None
return None # self._callback

def _get_config(self):
secret_key = os.getenv("LANGFUSE_SECRET_KEY", None)
public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None)
host = os.getenv("LANGFUSE_HOST", None)
return {"secret_key": secret_key, "public_key": public_key, "host": host}
17 changes: 17 additions & 0 deletions src/backend/base/langflow/services/tracing/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def _get_langwatch_tracer():
return LangWatchTracer


def _get_langfuse_tracer():
from langflow.services.tracing.langfuse import LangFuseTracer

return LangFuseTracer


class TracingService(Service):
name = "tracing_service"

Expand Down Expand Up @@ -101,6 +107,7 @@ async def initialize_tracers(self):
await self.start()
self._initialize_langsmith_tracer()
self._initialize_langwatch_tracer()
self._initialize_langfuse_tracer()
except Exception as e:
logger.debug(f"Error initializing tracers: {e}")

Expand All @@ -127,6 +134,16 @@ def _initialize_langwatch_tracer(self):
trace_id=self.run_id,
)

def _initialize_langfuse_tracer(self):
self.project_name = os.getenv("LANGCHAIN_PROJECT", "Langflow")
langfuse_tracer = _get_langfuse_tracer()
self._tracers["langfuse"] = langfuse_tracer(
trace_name=self.run_name,
trace_type="chain",
project_name=self.project_name,
trace_id=self.run_id,
)

def set_run_name(self, name: str):
self.run_name = name

Expand Down

0 comments on commit b5fc559

Please sign in to comment.