From 81699ab4939ea7b2555d110147ff153c63d8e7e4 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Sat, 16 Nov 2024 00:17:45 +0530 Subject: [PATCH] redact extra fields in asset endpoints for fast api --- .../api_fastapi/core_api/datamodels/assets.py | 12 ++ .../core_api/routes/public/test_assets.py | 163 +++++++++++++++++- 2 files changed, 166 insertions(+), 9 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index e5ac10715ed4..8e8a4eac5f52 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -21,6 +21,8 @@ from pydantic import BaseModel, Field, field_validator +from airflow.utils.log.secrets_masker import redact + class DagScheduleAssetReference(BaseModel): """DAG schedule reference serializer for assets.""" @@ -58,6 +60,11 @@ class AssetResponse(BaseModel): producing_tasks: list[TaskOutletAssetReference] aliases: list[AssetAliasSchema] + @field_validator("extra", mode="after") + @classmethod + def redact_extra(cls, v: dict): + return redact(v) + class AssetCollectionResponse(BaseModel): """Asset collection response.""" @@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel): created_dagruns: list[DagRunAssetReference] timestamp: datetime + @field_validator("extra", mode="after") + @classmethod + def redact_extra(cls, v: dict): + return redact(v) + class AssetEventCollectionResponse(BaseModel): """Asset event collection response.""" diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 42b7acd908ff..0d43a73b3fe1 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -53,6 +53,22 @@ def _create_assets(session, num: int = 2) -> None: session.commit() +def _create_assets_with_sensitive_extra(session, num: int = 2) -> None: + default_time = "2020-06-11T18:00:00+00:00" + assets = [ + AssetModel( + id=i, + uri=f"s3://bucket/key/{i}", + extra={"password": "bar"}, + created_at=timezone.parse(default_time), + updated_at=timezone.parse(default_time), + ) + for i in range(1, 1 + num) + ] + session.add_all(assets) + session.commit() + + def _create_provided_asset(session, asset: AssetModel) -> None: session.add(asset) session.commit() @@ -76,6 +92,24 @@ def _create_assets_events(session, num: int = 2) -> None: session.commit() +def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None: + default_time = "2020-06-11T18:00:00+00:00" + assets_events = [ + AssetEvent( + id=i, + asset_id=i, + extra={"password": "bar"}, + source_task_id="source_task_id", + source_dag_id="source_dag_id", + source_run_id=f"source_run_id_{i}", + timestamp=timezone.parse(default_time), + ) + for i in range(1, 1 + num) + ] + session.add_all(assets_events) + session.commit() + + def _create_provided_asset_event(session, asset_event: AssetEvent) -> None: session.add(asset_event) session.commit() @@ -119,6 +153,15 @@ def setup(self) -> None: clear_db_assets() clear_db_runs() + @pytest.fixture + def time_freezer(self) -> Generator: + freezer = time_machine.travel(self.default_time, tick=False) + freezer.start() + + yield + + freezer.stop() + def teardown_method(self) -> None: clear_db_assets() clear_db_runs() @@ -127,6 +170,10 @@ def teardown_method(self) -> None: def create_assets(self, session, num: int = 2): _create_assets(session=session, num=num) + @provide_session + def create_assets_with_sensitive_extra(self, session, num: int = 2): + _create_assets_with_sensitive_extra(session=session, num=num) + @provide_session def create_provided_asset(self, session, asset: AssetModel): _create_provided_asset(session=session, asset=asset) @@ -135,6 +182,10 @@ def create_provided_asset(self, session, asset: AssetModel): def create_assets_events(self, session, num: int = 2): _create_assets_events(session=session, num=num) + @provide_session + def create_assets_events_with_sensitive_extra(self, session, num: int = 2): + _create_assets_events_with_sensitive_extra(session=session, num=num) + @provide_session def create_provided_asset_event(self, session, asset_event: AssetEvent): _create_provided_asset_event(session=session, asset_event=asset_event) @@ -424,6 +475,68 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris): asset_uris = [asset["uri"] for asset in response.json()["asset_events"]] assert asset_uris == expected_asset_uris + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra_logs(self, test_client, session): + self.create_assets_with_sensitive_extra() + self.create_assets_events_with_sensitive_extra() + self.create_dag_run() + self.create_asset_dag_run() + response = test_client.get("/public/assets/events") + assert response.status_code == 200 + response_data = response.json() + assert response_data == { + "asset_events": [ + { + "id": 1, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***"}, + "source_task_id": "source_task_id", + "source_dag_id": "source_dag_id", + "source_run_id": "source_run_id_1", + "source_map_index": -1, + "created_dagruns": [ + { + "run_id": "source_run_id_1", + "dag_id": "source_dag_id", + "logical_date": "2020-06-11T18:00:00Z", + "start_date": "2020-06-11T18:00:00Z", + "end_date": "2020-06-11T18:00:00Z", + "state": "success", + "data_interval_start": "2020-06-11T18:00:00Z", + "data_interval_end": "2020-06-11T18:00:00Z", + } + ], + "timestamp": "2020-06-11T18:00:00Z", + }, + { + "id": 2, + "asset_id": 2, + "uri": "s3://bucket/key/2", + "extra": {"password": "***"}, + "source_task_id": "source_task_id", + "source_dag_id": "source_dag_id", + "source_run_id": "source_run_id_2", + "source_map_index": -1, + "created_dagruns": [ + { + "run_id": "source_run_id_2", + "dag_id": "source_dag_id", + "logical_date": "2020-06-11T18:00:00Z", + "start_date": "2020-06-11T18:00:00Z", + "end_date": "2020-06-11T18:00:00Z", + "state": "success", + "data_interval_start": "2020-06-11T18:00:00Z", + "data_interval_end": "2020-06-11T18:00:00Z", + } + ], + "timestamp": "2020-06-11T18:00:00Z", + }, + ], + "total_entries": 2, + } + class TestGetAssetEndpoint(TestAssets): @pytest.mark.parametrize( @@ -463,17 +576,29 @@ def test_should_respond_404(self, test_client): assert response.status_code == 404 assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra_logs(self, test_client, session): + self.create_assets_with_sensitive_extra() + tz_datetime_format = self.default_time.replace("+00:00", "Z") + uri = "s3://bucket/key/1" + response = test_client.get( + f"/public/assets/{uri}", + ) + assert response.status_code == 200 + assert response.json() == { + "id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***"}, + "created_at": tz_datetime_format, + "updated_at": tz_datetime_format, + "consuming_dags": [], + "producing_tasks": [], + "aliases": [], + } -class TestPostAssetEvents(TestAssets): - @pytest.fixture - def time_freezer(self) -> Generator: - freezer = time_machine.travel(self.default_time, tick=False) - freezer.start() - - yield - - freezer.stop() +class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets() @@ -499,3 +624,23 @@ def test_invalid_attr_not_allowed(self, test_client, session): response = test_client.post("/public/assets/events", json=event_invalid_payload) assert response.status_code == 422 + + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra_logs(self, test_client, session): + self.create_assets() + event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + response = test_client.post("/public/assets/events", json=event_payload) + assert response.status_code == 200 + assert response.json() == { + "id": mock.ANY, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***", "from_rest_api": True}, + "source_task_id": None, + "source_dag_id": None, + "source_run_id": None, + "source_map_index": -1, + "created_dagruns": [], + "timestamp": self.default_time.replace("+00:00", "Z"), + }