Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: decompose complex function into smaller ones for readability #5517

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 82 additions & 66 deletions src/backend/base/langflow/custom/custom_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,78 +868,94 @@ async def build_results(self):
raise

async def _build_results(self) -> tuple[dict, dict]:
results = {}
artifacts = {}
results, artifacts = {}, {}

self._pre_run_setup_if_needed()
self._handle_tool_mode()

for output in self._get_outputs_to_process():
self._current_output = output.name
result = await self._get_output_result(output)
results[output.name] = result
artifacts[output.name] = self._build_artifact(result)
self._log_output(output)

self._finalize_results(results, artifacts)
return results, artifacts

def _pre_run_setup_if_needed(self):
if hasattr(self, "_pre_run_setup"):
self._pre_run_setup()
if hasattr(self, "outputs"):
if any(getattr(_input, "tool_mode", False) for _input in self.inputs):
self._append_tool_to_outputs_map()
for output in self._outputs_map.values():
# Build the output if it's connected to some other vertex
# or if it's not connected to any vertex
if (
not self._vertex
or not self._vertex.outgoing_edges
or output.name in self._vertex.edges_source_names
):
if output.method is None:
msg = f"Output {output.name} does not have a method defined."
raise ValueError(msg)
self._current_output = output.name
method: Callable = getattr(self, output.method)
if output.cache and output.value != UNDEFINED:
results[output.name] = output.value
result = output.value
else:
# If the method is asynchronous, we need to await it
if inspect.iscoroutinefunction(method):
result = await method()
else:
result = await asyncio.to_thread(method)
if (
self._vertex is not None
and isinstance(result, Message)
and result.flow_id is None
and self._vertex.graph.flow_id is not None
):
result.set_flow_id(self._vertex.graph.flow_id)
results[output.name] = result
output.value = result

custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, dict | Data | str):
custom_repr = result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
raw = result
if self.status is None:
artifact_value = raw
else:
artifact_value = self.status
raw = self.status

if hasattr(raw, "data") and raw is not None:
raw = raw.data
if raw is None:
raw = custom_repr

elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
if raw is None and isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
artifact_type = get_artifact_type(artifact_value, result)
raw, artifact_type = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
artifacts[output.name] = artifact
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""

def _handle_tool_mode(self):
if hasattr(self, "outputs") and any(getattr(_input, "tool_mode", False) for _input in self.inputs):
self._append_tool_to_outputs_map()

def _should_process_output(self, output):
if not self._vertex or not self._vertex.outgoing_edges:
return True
return output.name in self._vertex.edges_source_names

def _get_outputs_to_process(self):
return (output for output in self._outputs_map.values() if self._should_process_output(output))

async def _get_output_result(self, output):
if output.cache and output.value != UNDEFINED:
return output.value

if output.method is None:
msg = f'Output "{output.name}" does not have a method defined.'
raise ValueError(msg)

method = getattr(self, output.method)
try:
result = await method() if inspect.iscoroutinefunction(method) else await asyncio.to_thread(method)
except TypeError as e:
msg = f'Error running method "{output.method}": {e}'
raise TypeError(msg) from e

if (
self._vertex is not None
and isinstance(result, Message)
and result.flow_id is None
and self._vertex.graph.flow_id is not None
):
result.set_flow_id(self._vertex.graph.flow_id)

output.value = result
return result

def _build_artifact(self, result):
custom_repr = self.custom_repr() or (result if isinstance(result, dict | Data | str) else str(result))

raw = self._process_raw_result(result)
artifact_type = get_artifact_type(self.status or raw, result)
raw, artifact_type = post_process_raw(raw, artifact_type)
return {"repr": custom_repr, "raw": raw, "type": artifact_type}

def _process_raw_result(self, result):
if self.status:
raw = self.status
elif hasattr(result, "data"):
raw = result.data
elif hasattr(result, "model_dump"):
raw = result.model_dump()
elif isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
else:
raw = result
return raw

def _log_output(self, output):
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""

def _finalize_results(self, results, artifacts):
self._artifacts = artifacts
self._results = results
if self._tracing_service:
self._tracing_service.set_outputs(self.trace_name, results)
return results, artifacts

def custom_repr(self):
if self.repr_value == "":
Expand Down
Loading