Skip to content

Commit

Permalink
shim-airbyte-cdk: redirect rogue outputs to stdout
Browse files Browse the repository at this point in the history
Some imported connectors have errant `print` calls or similar that output
directly to stdout, which Flow will otherwise interpret as a captured document
or checkpoint.

This is really ugly, but to prevent it from happening we can redirect stdout to
stderr after the task outputs have been initialized. That way the printed output
is treated as logs instead of capture data.
  • Loading branch information
williamhbaker committed Nov 8, 2024
1 parent a492d45 commit 515f8c2
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion estuary-cdk/estuary_cdk/shim_airbyte_cdk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
import io
from logging import Logger
import sys
from pydantic import Field
from typing import Any, ClassVar, Annotated, Callable, Awaitable, List, Literal
import asyncio
Expand Down Expand Up @@ -294,7 +296,6 @@ async def _run(
config: EndpointConfig,
connector_state: ConnectorState,
) -> None:

airbyte_streams: list[ConfiguredAirbyteStream] = [
ConfiguredAirbyteStream(
stream=AirbyteStream(
Expand Down Expand Up @@ -336,6 +337,14 @@ async def _run(
AirbyteStateMessage(**rs.state) for _, rs in index.values() if rs.state
]

# Redirect any rogue `print` or similar outputs that are sent to stdout
# in the airbyte code to stderr instead so that they are interpreted as
# logs instead of captured documents or checkpoints. It's ok to
# re-assign stdout like this here without keeping a reference to it
# because the `task` output has already captured a reference to the
# original stdout.
sys.stdout = sys.stderr

for message in self.delegate.read(
task.log, config, airbyte_catalog, airbyte_states
):
Expand Down

0 comments on commit 515f8c2

Please sign in to comment.