Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP 84: Migrate POST ASSET EVENT legacy API to fast API. #43984

Merged
merged 56 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
cf5218c
AIP-84: Migrating GET Assets to fastAPI
amoghrajesh Nov 6, 2024
5a49280
matching response to legacy
amoghrajesh Nov 6, 2024
962572b
Adding unit tests - part 1
amoghrajesh Nov 8, 2024
428cb6c
Update airflow/api_fastapi/common/parameters.py
amoghrajesh Nov 8, 2024
a78d3cb
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
882d20c
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
25bb08e
Adding unit tests - part 2
amoghrajesh Nov 8, 2024
658479d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fa0cd23
fixing unit tests & updating parameter type
amoghrajesh Nov 8, 2024
dd791c2
review comments pierre
amoghrajesh Nov 8, 2024
06fa0a7
fixing last commit
amoghrajesh Nov 8, 2024
3bd803b
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fc29d7d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
7a97220
fixing unit tests
amoghrajesh Nov 9, 2024
8b6b09e
migrating get assets events endpoint to fastapi
vatsrahul1001 Nov 11, 2024
b7aaa9d
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 11, 2024
b808895
fixing test response
vatsrahul1001 Nov 11, 2024
1e18c0f
Merge branch 'AIP84-get-asset-events' of github.com:astronomer/airflo…
vatsrahul1001 Nov 11, 2024
52939e5
Adding tests for filtering
vatsrahul1001 Nov 12, 2024
08198f9
Merge branch 'main' of github.com:astronomer/airflow into AIP84-get-a…
vatsrahul1001 Nov 12, 2024
bf68954
Merge branch 'AIP84-get-asset-to-fastapi' of github.com:astronomer/ai…
vatsrahul1001 Nov 12, 2024
674aff9
pushing changes from AIP84-get-asset-to-fastapi
vatsrahul1001 Nov 12, 2024
0fac5ed
pushing changes from AIP84-get-asset-to-fastapi
vatsrahul1001 Nov 12, 2024
991edfd
resoving conflicts with main
vatsrahul1001 Nov 12, 2024
f30956a
resoving conflicts with main
vatsrahul1001 Nov 12, 2024
255dc81
address review comments
vatsrahul1001 Nov 13, 2024
35ed49c
resoving conflicts with main
vatsrahul1001 Nov 13, 2024
1b84e09
fixing test parametrize
vatsrahul1001 Nov 13, 2024
d9f3d2f
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
60b1f2d
updating sort params
vatsrahul1001 Nov 13, 2024
740cde0
Merge branch 'AIP84-get-asset-events' of github.com:astronomer/airflo…
vatsrahul1001 Nov 13, 2024
297d53a
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
e51fee1
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
4870d88
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
7eba6d4
address review comments
vatsrahul1001 Nov 13, 2024
4c1f154
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
a763033
address review comments
vatsrahul1001 Nov 13, 2024
ac74c18
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
bbca07b
AIP84-create-asset-events migration implementation using fastapi
vatsrahul1001 Nov 13, 2024
60cf12c
getting main latest
vatsrahul1001 Nov 14, 2024
cdc0b1d
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
9071559
Mergering latest of AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
e3abc2c
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 14, 2024
9bab906
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 14, 2024
ebc2b05
Merge branch 'main' of github.com:astronomer/airflow into AIP84-creat…
vatsrahul1001 Nov 14, 2024
fdde5ac
removing typo print command
vatsrahul1001 Nov 14, 2024
f671fce
remove typo print statement
vatsrahul1001 Nov 14, 2024
6c8353c
address review comments
vatsrahul1001 Nov 14, 2024
cb2210d
merging from main
vatsrahul1001 Nov 14, 2024
29b2e3e
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 15, 2024
60c902c
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 15, 2024
45a74d9
address review comments
vatsrahul1001 Nov 15, 2024
691a186
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 15, 2024
8af74db
Update airflow/api_fastapi/core_api/datamodels/assets.py
vatsrahul1001 Nov 15, 2024
733db9a
address review comments
vatsrahul1001 Nov 15, 2024
bb64827
Merge branch 'main' into AIP84-create-asset-events
vatsrahul1001 Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def delete_asset_queued_events(
)


@mark_fastapi_migration_done
@security.requires_access_asset("POST")
@provide_session
@action_logging
Expand Down
12 changes: 12 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,15 @@ class AssetEventCollectionResponse(BaseModel):

asset_events: list[AssetEventResponse]
total_entries: int


class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

asset_uri: str
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
extra: dict

class Config:
"""Pydantic config."""

extra = "forbid"
58 changes: 58 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,49 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
post:
tags:
- Asset
summary: Create Asset Events
description: Create asset events.
operationId: create_asset_events
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAssetEventsBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AssetEventResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
Expand Down Expand Up @@ -4037,6 +4080,21 @@ components:
- message
title: ConnectionTestResponse
description: Connection Test serializer for responses.
CreateAssetEventsBody:
properties:
asset_uri:
type: string
title: Asset Uri
extra:
type: object
title: Extra
additionalProperties: false
type: object
required:
- asset_uri
- extra
title: CreateAssetEventsBody
description: Create asset events request.
DAGCollectionResponse:
properties:
dags:
Expand Down
36 changes: 35 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
AssetEventCollectionResponse,
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetEvent, AssetModel
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")

Expand Down Expand Up @@ -86,7 +90,7 @@ def get_assets(

@assets_router.get(
"/events",
responses=create_openapi_http_exception_doc([404]),
responses=create_openapi_http_exception_doc([401, 403, 404]),
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
)
def get_asset_events(
limit: QueryLimit,
Expand Down Expand Up @@ -134,6 +138,36 @@ def get_asset_events(
)


@assets_router.post(
"/events",
responses=create_openapi_http_exception_doc([401, 403, 404]),
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
)
def create_asset_events(
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
create_asset_request: CreateAssetEventsBody,
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
session: Annotated[Session, Depends(get_session)],
) -> AssetEventResponse:
"""Create asset events."""
asset = session.scalar(
select(AssetModel).where(AssetModel.uri == create_asset_request.asset_uri).limit(1)
)
if not asset:
raise HTTPException(404, f"Asset with uri: `{create_asset_request.asset_uri}` was not found")
timestamp = timezone.utcnow()

create_asset_request.extra["from_rest_api"] = True
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved

assets_event = asset_manager.register_asset_change(
asset=Asset(uri=create_asset_request.asset_uri),
timestamp=timestamp,
extra=create_asset_request.extra,
session=session,
)

if not assets_event:
raise HTTPException(404, f"Asset with uri: `{create_asset_request.asset_uri}` was not found")
return AssetEventResponse.model_validate(assets_event, from_attributes=True)


@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,9 @@ export const UseXcomServiceGetXcomEntryKeyFn = (
{ dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
]),
];
export type AssetServiceCreateAssetEventsMutationResult = Awaited<
ReturnType<typeof AssetService.createAssetEvents>
>;
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
import {
BackfillPostBody,
ConnectionBody,
CreateAssetEventsBody,
DAGPatchBody,
DAGRunPatchBody,
DagRunState,
Expand Down Expand Up @@ -1694,6 +1695,45 @@ export const useXcomServiceGetXcomEntry = <
}) as TData,
...options,
});
/**
* Create Asset Events
* Create asset events.
* @param data The data for the request.
* @param data.requestBody
* @returns AssetEventResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceCreateAssetEvents = <
TData = Common.AssetServiceCreateAssetEventsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: CreateAssetEventsBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: CreateAssetEventsBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
AssetService.createAssetEvents({
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Create Backfill
* @param data The data for the request.
Expand Down
18 changes: 18 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,24 @@ export const $ConnectionTestResponse = {
description: "Connection Test serializer for responses.",
} as const;

export const $CreateAssetEventsBody = {
properties: {
asset_uri: {
type: "string",
title: "Asset Uri",
},
extra: {
type: "object",
title: "Extra",
},
},
additionalProperties: false,
type: "object",
required: ["asset_uri", "extra"],
title: "CreateAssetEventsBody",
description: "Create asset events request.",
} as const;

export const $DAGCollectionResponse = {
properties: {
dags: {
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import type {
GetAssetsResponse,
GetAssetEventsData,
GetAssetEventsResponse,
CreateAssetEventsData,
CreateAssetEventsResponse,
GetAssetData,
GetAssetResponse,
HistoricalMetricsData,
Expand Down Expand Up @@ -214,6 +216,31 @@ export class AssetService {
});
}

/**
* Create Asset Events
* Create asset events.
* @param data The data for the request.
* @param data.requestBody
* @returns AssetEventResponse Successful Response
* @throws ApiError
*/
public static createAssetEvents(
data: CreateAssetEventsData,
): CancelablePromise<CreateAssetEventsResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/assets/events",
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}

/**
* Get Asset
* Get an asset.
Expand Down
41 changes: 41 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ export type ConnectionTestResponse = {
message: string;
};

/**
* Create asset events request.
*/
export type CreateAssetEventsBody = {
asset_uri: string;
extra: {
[key: string]: unknown;
};
};

/**
* DAG Collection serializer for responses.
*/
Expand Down Expand Up @@ -984,6 +994,12 @@ export type GetAssetEventsData = {

export type GetAssetEventsResponse = AssetEventCollectionResponse;

export type CreateAssetEventsData = {
requestBody: CreateAssetEventsBody;
};

export type CreateAssetEventsResponse = AssetEventResponse;

export type GetAssetData = {
uri: string;
};
Expand Down Expand Up @@ -1503,6 +1519,31 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
post: {
req: CreateAssetEventsData;
res: {
/**
* Successful Response
*/
200: AssetEventResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/assets/{uri}": {
get: {
Expand Down
30 changes: 30 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import urllib
from unittest import mock

import pytest

Expand Down Expand Up @@ -459,3 +460,32 @@ def test_should_respond_404(self, test_client):
)
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"


class TestPostAssetEvents(TestAssets):
def test_should_respond_200(self, test_client, session):
self.create_assets()
event_payload = {"asset_uri": "s3://bucket/key/1", "extra": {"foo": "bar"}}
response = test_client.post("/public/assets/events", json=event_payload)

assert response.status_code == 200

assert response.json() == {
"id": mock.ANY,
"asset_id": mock.ANY,
"uri": "s3://bucket/key/1",
"extra": {"foo": "bar", "from_rest_api": True},
"source_task_id": mock.ANY,
"source_dag_id": mock.ANY,
"source_run_id": mock.ANY,
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
"source_map_index": -1,
"created_dagruns": [],
"timestamp": mock.ANY,
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
}
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved

def test_invalid_attr_not_allowed(self, test_client, session):
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
self.create_assets()
event_invalid_payload = {"asset_uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "fake": {}}
response = test_client.post("/public/assets/events", json=event_invalid_payload)

assert response.status_code == 422