Skip to content

Commit

Permalink
estuary-cdk: Add restart_interval override to allow connectors with…
Browse files Browse the repository at this point in the history
… known long-running tasks to restart less frequently
  • Loading branch information
jshearer committed Mar 11, 2024
1 parent 799c8a2 commit a31b377
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
8 changes: 7 additions & 1 deletion estuary-cdk/estuary_cdk/capture/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,13 @@ async def stop_on_elapsed_interval(interval: int) -> None:
# Gracefully exit after the capture interval has elapsed.
# We don't do this within the TaskGroup because we don't
# want to block on it.
asyncio.create_task(stop_on_elapsed_interval(open.capture.intervalSeconds))
asyncio.create_task(
stop_on_elapsed_interval(
int(open.capture.config.restart_interval.total_seconds())
if open.capture.config.restart_interval
else open.capture.intervalSeconds
)
)

async with asyncio.TaskGroup() as tg:

Expand Down
19 changes: 16 additions & 3 deletions estuary-cdk/estuary_cdk/flow.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import abc
from dataclasses import dataclass
from pydantic import BaseModel, NonNegativeInt, PositiveInt
from typing import Any, Literal, TypeVar, Generic, Literal
from datetime import timedelta
from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt
from typing import Any, Literal, Optional, TypeVar, Generic, Literal

from .pydantic_polyfill import GenericModel


class BaseEndpointConfig(abc.ABC, BaseModel, extra="forbid"):
"""
BaseEndpointConfig defines the endpoint config attribute(s) shared by all connectors.
"""

# If unset, use the default `intervalSeconds` provided as part of the Open request
restart_interval: Optional[timedelta] = Field(
default=None, description="How long before the connector restarts automatically"
)


# The type of this invoked connector.
ConnectorType = Literal[
"IMAGE", # We're running with the context of a container image.
"LOCAL", # We're running directly on the host as a local process.
]

# Generic type of a connector's endpoint configuration.
EndpointConfig = TypeVar("EndpointConfig")
EndpointConfig = TypeVar("EndpointConfig", bound=BaseEndpointConfig)

# Generic type of a connector's resource configuration.
ResourceConfig = TypeVar("ResourceConfig", bound=BaseModel)
Expand Down

0 comments on commit a31b377

Please sign in to comment.