From 0393c1ffc8bb6b9513865e5b2c73a3b8eef29915 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:38:46 -0800 Subject: [PATCH] Add basic asyncio support (#43944) Add the ability to get an async sqlalchemy session for all 3 dialects. This is meant to be sort of a hello world for asyncio support in airflow. It will be refined and extended in the future. But I think airflow ultimately really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler. --------- Co-authored-by: Kaxil Naik --- .github/workflows/basic-tests.yml | 17 +++++++--- airflow/settings.py | 34 ++++++++++++++++++- dev/breeze/tests/test_packages.py | 2 ++ generated/provider_dependencies.json | 3 ++ .../src/airflow/providers/mysql/provider.yaml | 1 + .../airflow/providers/postgres/provider.yaml | 1 + .../airflow/providers/sqlite/provider.yaml | 1 + scripts/ci/kubernetes/k8s_requirements.txt | 2 +- tests/utils/test_session.py | 12 +++++++ 9 files changed, 67 insertions(+), 6 deletions(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 22f5d0652c9b..14f0628e454b 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -196,10 +196,19 @@ jobs: working-directory: ./clients/python - name: "Install source version of required packages" run: | - breeze release-management prepare-provider-packages fab standard common.sql --package-format \ - wheel --skip-tag-check --version-suffix-for-pypi dev0 - pip install . dist/apache_airflow_providers_fab-*.whl \ - dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl + breeze release-management prepare-provider-packages \ + fab \ + standard \ + common.sql \ + sqlite \ + --package-format wheel \ + --skip-tag-check \ + --version-suffix-for-pypi dev0 + pip install . \ + dist/apache_airflow_providers_fab-*.whl \ + dist/apache_airflow_providers_standard-*.whl \ + dist/apache_airflow_providers_common_sql-*.whl \ + dist/apache_airflow_providers_sqlite-*.whl breeze release-management prepare-task-sdk-package --package-format wheel pip install ./dist/apache_airflow_task_sdk-*.whl - name: "Install Python client" diff --git a/airflow/settings.py b/airflow/settings.py index a3f99510adba..c3f32fa59d98 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -31,6 +31,7 @@ import pluggy from packaging.version import Version from sqlalchemy import create_engine, exc, text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import NullPool @@ -95,8 +96,17 @@ DONOT_MODIFY_HANDLERS: bool | None = None DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) +AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} +""" +Mapping of sync scheme to async scheme. + +:meta private: +""" + engine: Engine Session: Callable[..., SASession] +async_engine: AsyncEngine +create_async_session: Callable[..., AsyncSession] # The JSON library to use for DAG Serialization and De-Serialization json = json @@ -199,13 +209,25 @@ def load_policy_plugins(pm: pluggy.PluginManager): pm.load_setuptools_entrypoints("airflow.policy") +def _get_async_conn_uri_from_sync(sync_uri): + scheme, rest = sync_uri.split(":", maxsplit=1) + scheme = scheme.split("+", maxsplit=1)[0] + aiolib = AIO_LIBS_MAPPING.get(scheme) + if aiolib: + return f"{scheme}+{aiolib}:{rest}" + else: + return sync_uri + + def configure_vars(): """Configure Global Variables from airflow.cfg.""" global SQL_ALCHEMY_CONN + global SQL_ALCHEMY_CONN_ASYNC global DAGS_FOLDER global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") + SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) @@ -441,6 +463,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global Session global engine + global async_engine + global create_async_session + if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": # Skip DB initialization in unit tests, if DB tests are skipped Session = SkipDBTestsSession @@ -466,7 +491,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None): connect_args["check_same_thread"] = False engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) - + async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) + create_async_session = sessionmaker( + bind=async_engine, + autocommit=False, + autoflush=False, + class_=AsyncSession, + expire_on_commit=False, + ) mask_secret(engine.url.password) setup_event_handlers(engine) diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index fe3173b2e9dd..cccfd45e340f 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -211,6 +211,7 @@ def test_get_documentation_package_path(): """ "apache-airflow-providers-common-sql>=1.20.0b0", "apache-airflow>=2.8.0b0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4", """, id="beta0 suffix postgres", @@ -221,6 +222,7 @@ def test_get_documentation_package_path(): """ "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4", """, id="No suffix postgres", diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 5cc3aae764bc..bc3051911d6b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -906,6 +906,7 @@ }, "mysql": { "deps": [ + "aiomysql>=0.2.0", "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", "mysql-connector-python>=8.0.29", @@ -1085,6 +1086,7 @@ "deps": [ "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4" ], "devel-deps": [], @@ -1260,6 +1262,7 @@ }, "sqlite": { "deps": [ + "aiosqlite>=0.20.0", "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0" ], diff --git a/providers/src/airflow/providers/mysql/provider.yaml b/providers/src/airflow/providers/mysql/provider.yaml index 7a0c7d7ccbe4..28ec197214fd 100644 --- a/providers/src/airflow/providers/mysql/provider.yaml +++ b/providers/src/airflow/providers/mysql/provider.yaml @@ -76,6 +76,7 @@ dependencies: # Instead, if someone attempts to use it on MacOS, they will get explanatory error on how to install it - mysqlclient>=1.4.0; sys_platform != 'darwin' - mysql-connector-python>=8.0.29 + - aiomysql>=0.2.0 additional-extras: # only needed for backwards compatibility diff --git a/providers/src/airflow/providers/postgres/provider.yaml b/providers/src/airflow/providers/postgres/provider.yaml index 18c679ffb3d3..c6b6c1df2723 100644 --- a/providers/src/airflow/providers/postgres/provider.yaml +++ b/providers/src/airflow/providers/postgres/provider.yaml @@ -70,6 +70,7 @@ dependencies: - apache-airflow>=2.8.0 - apache-airflow-providers-common-sql>=1.20.0 - psycopg2-binary>=2.9.4 + - asyncpg>=0.30.0 additional-extras: - name: amazon diff --git a/providers/src/airflow/providers/sqlite/provider.yaml b/providers/src/airflow/providers/sqlite/provider.yaml index 8b50de2bf9e3..473c211966e6 100644 --- a/providers/src/airflow/providers/sqlite/provider.yaml +++ b/providers/src/airflow/providers/sqlite/provider.yaml @@ -57,6 +57,7 @@ versions: dependencies: - apache-airflow>=2.8.0 + - aiosqlite>=0.20.0 - apache-airflow-providers-common-sql>=1.20.0 integrations: diff --git a/scripts/ci/kubernetes/k8s_requirements.txt b/scripts/ci/kubernetes/k8s_requirements.txt index 2d00510e3826..db62ee859638 100644 --- a/scripts/ci/kubernetes/k8s_requirements.txt +++ b/scripts/ci/kubernetes/k8s_requirements.txt @@ -1,3 +1,3 @@ --e .[devel-devscripts,devel-tests,cncf.kubernetes] +-e .[devel-devscripts,devel-tests,cncf.kubernetes,sqlite] -e ./providers -e ./task_sdk diff --git a/tests/utils/test_session.py b/tests/utils/test_session.py index 82d0a00a26d9..ff19c0ca6dad 100644 --- a/tests/utils/test_session.py +++ b/tests/utils/test_session.py @@ -18,7 +18,9 @@ from __future__ import annotations import pytest +from sqlalchemy import select +from airflow.models import Log from airflow.utils.session import provide_session pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -53,3 +55,13 @@ def test_provide_session_with_kwargs(self): session = object() assert wrapper(session=session) is session + + @pytest.mark.asyncio + async def test_async_session(self): + from airflow.settings import create_async_session + + session = create_async_session() + session.add(Log(event="hihi1234")) + await session.commit() + l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741 + assert l.event == "hihi1234"