Skip to content

Commit

Permalink
redact extra fields in asset endpoints for fast api
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 committed Nov 15, 2024
1 parent 5c442d3 commit 81699ab
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 9 deletions.
12 changes: 12 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from pydantic import BaseModel, Field, field_validator

from airflow.utils.log.secrets_masker import redact


class DagScheduleAssetReference(BaseModel):
"""DAG schedule reference serializer for assets."""
Expand Down Expand Up @@ -58,6 +60,11 @@ class AssetResponse(BaseModel):
producing_tasks: list[TaskOutletAssetReference]
aliases: list[AssetAliasSchema]

@field_validator("extra", mode="after")
@classmethod
def redact_extra(cls, v: dict):
return redact(v)


class AssetCollectionResponse(BaseModel):
"""Asset collection response."""
Expand Down Expand Up @@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel):
created_dagruns: list[DagRunAssetReference]
timestamp: datetime

@field_validator("extra", mode="after")
@classmethod
def redact_extra(cls, v: dict):
return redact(v)


class AssetEventCollectionResponse(BaseModel):
"""Asset event collection response."""
Expand Down
163 changes: 154 additions & 9 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ def _create_assets(session, num: int = 2) -> None:
session.commit()


def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
default_time = "2020-06-11T18:00:00+00:00"
assets = [
AssetModel(
id=i,
uri=f"s3://bucket/key/{i}",
extra={"password": "bar"},
created_at=timezone.parse(default_time),
updated_at=timezone.parse(default_time),
)
for i in range(1, 1 + num)
]
session.add_all(assets)
session.commit()


def _create_provided_asset(session, asset: AssetModel) -> None:
session.add(asset)
session.commit()
Expand All @@ -76,6 +92,24 @@ def _create_assets_events(session, num: int = 2) -> None:
session.commit()


def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None:
default_time = "2020-06-11T18:00:00+00:00"
assets_events = [
AssetEvent(
id=i,
asset_id=i,
extra={"password": "bar"},
source_task_id="source_task_id",
source_dag_id="source_dag_id",
source_run_id=f"source_run_id_{i}",
timestamp=timezone.parse(default_time),
)
for i in range(1, 1 + num)
]
session.add_all(assets_events)
session.commit()


def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
session.add(asset_event)
session.commit()
Expand Down Expand Up @@ -119,6 +153,15 @@ def setup(self) -> None:
clear_db_assets()
clear_db_runs()

@pytest.fixture
def time_freezer(self) -> Generator:
freezer = time_machine.travel(self.default_time, tick=False)
freezer.start()

yield

freezer.stop()

def teardown_method(self) -> None:
clear_db_assets()
clear_db_runs()
Expand All @@ -127,6 +170,10 @@ def teardown_method(self) -> None:
def create_assets(self, session, num: int = 2):
_create_assets(session=session, num=num)

@provide_session
def create_assets_with_sensitive_extra(self, session, num: int = 2):
_create_assets_with_sensitive_extra(session=session, num=num)

@provide_session
def create_provided_asset(self, session, asset: AssetModel):
_create_provided_asset(session=session, asset=asset)
Expand All @@ -135,6 +182,10 @@ def create_provided_asset(self, session, asset: AssetModel):
def create_assets_events(self, session, num: int = 2):
_create_assets_events(session=session, num=num)

@provide_session
def create_assets_events_with_sensitive_extra(self, session, num: int = 2):
_create_assets_events_with_sensitive_extra(session=session, num=num)

@provide_session
def create_provided_asset_event(self, session, asset_event: AssetEvent):
_create_provided_asset_event(session=session, asset_event=asset_event)
Expand Down Expand Up @@ -424,6 +475,68 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris):
asset_uris = [asset["uri"] for asset in response.json()["asset_events"]]
assert asset_uris == expected_asset_uris

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra_logs(self, test_client, session):
self.create_assets_with_sensitive_extra()
self.create_assets_events_with_sensitive_extra()
self.create_dag_run()
self.create_asset_dag_run()
response = test_client.get("/public/assets/events")
assert response.status_code == 200
response_data = response.json()
assert response_data == {
"asset_events": [
{
"id": 1,
"asset_id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***"},
"source_task_id": "source_task_id",
"source_dag_id": "source_dag_id",
"source_run_id": "source_run_id_1",
"source_map_index": -1,
"created_dagruns": [
{
"run_id": "source_run_id_1",
"dag_id": "source_dag_id",
"logical_date": "2020-06-11T18:00:00Z",
"start_date": "2020-06-11T18:00:00Z",
"end_date": "2020-06-11T18:00:00Z",
"state": "success",
"data_interval_start": "2020-06-11T18:00:00Z",
"data_interval_end": "2020-06-11T18:00:00Z",
}
],
"timestamp": "2020-06-11T18:00:00Z",
},
{
"id": 2,
"asset_id": 2,
"uri": "s3://bucket/key/2",
"extra": {"password": "***"},
"source_task_id": "source_task_id",
"source_dag_id": "source_dag_id",
"source_run_id": "source_run_id_2",
"source_map_index": -1,
"created_dagruns": [
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": "2020-06-11T18:00:00Z",
"start_date": "2020-06-11T18:00:00Z",
"end_date": "2020-06-11T18:00:00Z",
"state": "success",
"data_interval_start": "2020-06-11T18:00:00Z",
"data_interval_end": "2020-06-11T18:00:00Z",
}
],
"timestamp": "2020-06-11T18:00:00Z",
},
],
"total_entries": 2,
}


class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
Expand Down Expand Up @@ -463,17 +576,29 @@ def test_should_respond_404(self, test_client):
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra_logs(self, test_client, session):
self.create_assets_with_sensitive_extra()
tz_datetime_format = self.default_time.replace("+00:00", "Z")
uri = "s3://bucket/key/1"
response = test_client.get(
f"/public/assets/{uri}",
)
assert response.status_code == 200
assert response.json() == {
"id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***"},
"created_at": tz_datetime_format,
"updated_at": tz_datetime_format,
"consuming_dags": [],
"producing_tasks": [],
"aliases": [],
}

class TestPostAssetEvents(TestAssets):
@pytest.fixture
def time_freezer(self) -> Generator:
freezer = time_machine.travel(self.default_time, tick=False)
freezer.start()

yield

freezer.stop()

class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
self.create_assets()
Expand All @@ -499,3 +624,23 @@ def test_invalid_attr_not_allowed(self, test_client, session):
response = test_client.post("/public/assets/events", json=event_invalid_payload)

assert response.status_code == 422

@pytest.mark.usefixtures("time_freezer")
@pytest.mark.enable_redact
def test_should_mask_sensitive_extra_logs(self, test_client, session):
self.create_assets()
event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}}
response = test_client.post("/public/assets/events", json=event_payload)
assert response.status_code == 200
assert response.json() == {
"id": mock.ANY,
"asset_id": 1,
"uri": "s3://bucket/key/1",
"extra": {"password": "***", "from_rest_api": True},
"source_task_id": None,
"source_dag_id": None,
"source_run_id": None,
"source_map_index": -1,
"created_dagruns": [],
"timestamp": self.default_time.replace("+00:00", "Z"),
}

0 comments on commit 81699ab

Please sign in to comment.