Skip to content

Commit

Permalink
refactor: decompose complex function into smaller ones for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
italojohnny committed Jan 2, 2025
1 parent 8ea4405 commit 3cc0ae3
Showing 1 changed file with 81 additions and 66 deletions.
147 changes: 81 additions & 66 deletions src/backend/base/langflow/custom/custom_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,78 +867,93 @@ async def build_results(self):
raise

async def _build_results(self) -> tuple[dict, dict]:
results = {}
artifacts = {}
results, artifacts = {}, {}

self._pre_run_setup_if_needed()
self._handle_tool_mode()

for output in self._get_outputs_to_process():
self._current_output = output.name
result = await self._get_output_result(output)
results[output.name] = result
artifacts[output.name] = self._build_artifact(result)
self._log_output(output)

self._finalize_results(results, artifacts)
return results, artifacts

def _pre_run_setup_if_needed(self):
if hasattr(self, "_pre_run_setup"):
self._pre_run_setup()
if hasattr(self, "outputs"):
if any(getattr(_input, "tool_mode", False) for _input in self.inputs):
self._append_tool_to_outputs_map()
for output in self._outputs_map.values():
# Build the output if it's connected to some other vertex
# or if it's not connected to any vertex
if (
not self._vertex
or not self._vertex.outgoing_edges
or output.name in self._vertex.edges_source_names
):
if output.method is None:
msg = f"Output {output.name} does not have a method defined."
raise ValueError(msg)
self._current_output = output.name
method: Callable = getattr(self, output.method)
if output.cache and output.value != UNDEFINED:
results[output.name] = output.value
result = output.value
else:
# If the method is asynchronous, we need to await it
if inspect.iscoroutinefunction(method):
result = await method()
else:
result = await asyncio.to_thread(method)
if (
self._vertex is not None
and isinstance(result, Message)
and result.flow_id is None
and self._vertex.graph.flow_id is not None
):
result.set_flow_id(self._vertex.graph.flow_id)
results[output.name] = result
output.value = result

custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, dict | Data | str):
custom_repr = result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
raw = result
if self.status is None:
artifact_value = raw
else:
artifact_value = self.status
raw = self.status

if hasattr(raw, "data") and raw is not None:
raw = raw.data
if raw is None:
raw = custom_repr

elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
if raw is None and isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
artifact_type = get_artifact_type(artifact_value, result)
raw, artifact_type = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
artifacts[output.name] = artifact
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""

def _handle_tool_mode(self):
if hasattr(self, "outputs") and any(getattr(_input, "tool_mode", False) for _input in self.inputs):
self._append_tool_to_outputs_map()

def _get_outputs_to_process(self):
return [
output
for output in self._outputs_map.values()
if not self._vertex or not self._vertex.outgoing_edges or output.name in self._vertex.edges_source_names
]

async def _get_output_result(self, output):
if output.cache and output.value != UNDEFINED:
return output.value

if output.method is None:
msg = f'Output "{output.name}" does not have a method defined.'
raise ValueError(msg)

method = getattr(self, output.method)
try:
result = await method() if inspect.iscoroutinefunction(method) else await asyncio.to_thread(method)
except TypeError as e:
msg = f'Error running method "{output.method}": {e}'
raise TypeError(msg) from e

if (
self._vertex is not None
and isinstance(result, Message)
and result.flow_id is None
and self._vertex.graph.flow_id is not None
):
result.set_flow_id(self._vertex.graph.flow_id)

output.value = result
return result

def _build_artifact(self, result):
custom_repr = self.custom_repr() or (result if isinstance(result, dict | Data | str) else str(result))

raw = self._process_raw_result(result)
artifact_type = get_artifact_type(self.status or raw, result)
raw, artifact_type = post_process_raw(raw, artifact_type)
return {"repr": custom_repr, "raw": raw, "type": artifact_type}

def _process_raw_result(self, result):
if self.status:
raw = self.status
elif hasattr(result, "data"):
raw = result.data
elif hasattr(result, "model_dump"):
raw = result.model_dump()
elif isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
else:
raw = result
return raw

def _log_output(self, output):
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""

def _finalize_results(self, results, artifacts):
self._artifacts = artifacts
self._results = results
if self._tracing_service:
self._tracing_service.set_outputs(self.trace_name, results)
return results, artifacts

def custom_repr(self):
if self.repr_value == "":
Expand Down

0 comments on commit 3cc0ae3

Please sign in to comment.