Skip to content

Commit

Permalink
Feat: Add support for YC managed Airflow (#3396)
Browse files Browse the repository at this point in the history
  • Loading branch information
petrikoro authored Nov 21, 2024
1 parent 4fd59ba commit 463932a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/integrations/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,7 @@ default_scheduler:
type: mwaa
environment: <The MWAA Environment Name>
```

### YC Airflow

SQLMesh fully supports Airflow hosted on Yandex [managed Airflow instances](https://yandex.cloud/en/services/managed-airflow) - see the [configuration reference page](../reference/configuration.md#yc-airflow) for more information.
11 changes: 11 additions & 0 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ See [Airflow Integration Guide](../integrations/airflow.md) for information abou

The Google Cloud Composer scheduler type shares the same configuration options as the `airflow` type, except for `username` and `password`. Cloud Composer relies on `gcloud` authentication, so the `username` and `password` options are not required.

#### YC Airflow

**Type:** `yc_airflow`

Yandex Managed Airflow shares similar configuration options with the standard `airflow` type, with the following exceptions:

- `max_snapshot_ids_per_request`: This option is deprecated and not supported.
- Authentication: YC Airflow requires additional credentials, including both a `token` and a combination of `username` and `password`.

Unlike the `airflow` type, YC Airflow leverages Yandex Cloud's internal authentication mechanisms. Therefore, all requests to the Airflow API must include a valid Yandex Cloud IAM-token for authentication.

## Gateway/connection defaults

The default gateway and connection keys specify what should happen when gateways or connections are not explicitly specified. Find additional details in the configuration overview page [gateway/connection defaults section](../guides/configuration.md#gatewayconnection-defaults).
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@
BuiltInSchedulerConfig as BuiltInSchedulerConfig,
CloudComposerSchedulerConfig as CloudComposerSchedulerConfig,
MWAASchedulerConfig as MWAASchedulerConfig,
YCAirflowSchedulerConfig as YCAirflowSchedulerConfig,
)
52 changes: 52 additions & 0 deletions sqlmesh/core/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,58 @@ def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
)


class YCAirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig):
"""The Yandex Cloud Managed Airflow Scheduler configuration.
Args:
airflow_url: The URL of the Airflow Webserver.
username: The Airflow username.
password: The Airflow password.
dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
whether a DAG has been created.
backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver (Deprecated).
use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
over the default catalog value set on the Airflow side.
token: The IAM-token for API authentification.
"""

airflow_url: str
username: str
password: str
token: str
dag_run_poll_interval_secs: int = 10
dag_creation_poll_interval_secs: int = 30
dag_creation_max_retry_attempts: int = 10

backfill_concurrent_tasks: int = 4
ddl_concurrent_tasks: int = 4

use_state_connection: bool = False

default_catalog_override: t.Optional[str] = None

_concurrent_tasks_validator = concurrent_tasks_validator

type_: Literal["yc_airflow"] = Field(alias="type", default="yc_airflow")

def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
session = Session()

session.auth = (self.username, self.password)
session.headers.update({"X-Cloud-Authorization": f"Bearer {self.token}"})

return AirflowClient(
session=session,
airflow_url=self.airflow_url,
console=console,
)


class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"):
"""The Google Cloud Composer configuration.
Expand Down

0 comments on commit 463932a

Please sign in to comment.