diff --git a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py index 080c6572d..d902c2e9c 100644 --- a/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py +++ b/estuary-cdk/estuary_cdk/shim_airbyte_cdk.py @@ -1,5 +1,7 @@ from dataclasses import dataclass +import io from logging import Logger +import sys from pydantic import Field from typing import Any, ClassVar, Annotated, Callable, Awaitable, List, Literal import asyncio @@ -294,7 +296,6 @@ async def _run( config: EndpointConfig, connector_state: ConnectorState, ) -> None: - airbyte_streams: list[ConfiguredAirbyteStream] = [ ConfiguredAirbyteStream( stream=AirbyteStream( @@ -336,6 +337,14 @@ async def _run( AirbyteStateMessage(**rs.state) for _, rs in index.values() if rs.state ] + # Redirect any rogue `print` or similar outputs that are sent to stdout + # in the airbyte code to stderr instead so that they are interpreted as + # logs instead of captured documents or checkpoints. It's ok to + # re-assign stdout like this here without keeping a reference to it + # because the `task` output has already captured a reference to the + # original stdout. + sys.stdout = sys.stderr + for message in self.delegate.read( task.log, config, airbyte_catalog, airbyte_states ):