Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-82 Save references between assets and triggers #43826

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

vincbeck
Copy link
Contributor

@vincbeck vincbeck commented Nov 8, 2024

Resolves #42510.

This PR adds a new attributes watchers to the Asset class and saves references between assets and triggers in the DB. For example:

trigger = SqsSensorTrigger(sqs_queue="my_queue")
asset = Asset("example_asset_watchers", watchers=[trigger])

with DAG(
    dag_id="example_dataset_watcher",
    schedule=[asset],
    catchup=False,
):
    task = EmptyOperator(task_id="task",)

    chain(task)

This PR creates the trigger in the DB if it does not exist and save the reference between asset and trigger.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:task-sdk labels Nov 8, 2024
@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch from 947e028 to 598bc69 Compare November 8, 2024 16:01
@vincbeck
Copy link
Contributor Author

vincbeck commented Nov 8, 2024

@Lee-W @uranusjr When working on it I realized that assets are added in the DB from DAG definition but never removed (or at least I did not see the code). Meaning, as a DAG author if I define an asset in my DAG and then later on remove it, the asset is never removed from the DB. Am I wrong? If not, is it intended?

@Lee-W
Copy link
Member

Lee-W commented Nov 9, 2024

@Lee-W @uranusjr When working on it I realized that assets are added in the DB from DAG definition but never removed (or at least I did not see the code). Meaning, as a DAG author if I define an asset in my DAG and then later on remove it, the asset is never removed from the DB. Am I wrong? If not, is it intended?

Yep, this is by design as of now. To keep the asset history.

@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch 2 times, most recently from d27cab5 to 682b713 Compare November 12, 2024 15:29
@vincbeck
Copy link
Contributor Author

@Lee-W @uranusjr When working on it I realized that assets are added in the DB from DAG definition but never removed (or at least I did not see the code). Meaning, as a DAG author if I define an asset in my DAG and then later on remove it, the asset is never removed from the DB. Am I wrong? If not, is it intended?

Yep, this is by design as of now. To keep the asset history.

Alright, thank you. I handled it then. I removed the references from asset and triggers if the asset is no longer used

@vincbeck
Copy link
Contributor Author

@Lee-W Any chance you can review it? You have some experience around assets that could be interesting to have :)

@Lee-W
Copy link
Member

Lee-W commented Nov 14, 2024

@Lee-W Any chance you can review it? You have some experience around assets that could be interesting to have :)

Sure thing :) Will take a look later today

airflow/assets/__init__.py Outdated Show resolved Hide resolved
airflow/assets/__init__.py Outdated Show resolved Hide resolved
airflow/dag_processing/collection.py Outdated Show resolved Hide resolved
# Create the trigger in the DB if it does not exist
if not trigger_model:
trigger_model = Trigger.from_object(trigger_class_path_to_asset_dict[trigger_class_path])
session.add(trigger_model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether collect all the models together and use add_all would be better 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collecting all model objects first is cleaner code IMO; this loop + add + append approach is a lot more difficult to read. Also the repeated scalar + limit call is not very performant; it is better to select all the existing triggers first in one query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestions, I tried to apply them. Please let me know if this is what you thought

Comment on lines 471 to 477
# Remove references from assets no longer used
all_assets = session.scalars(select(AssetModel))
# orphan_assets = set()
for asset_model in all_assets:
if (asset_model.name, asset_model.uri) not in self.assets:
asset_model.triggers = []
# orphan_assets.add(asset_model.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this actively? What happens if we just leave those associations there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the trigger will keep updating the asset in cases of events. More importantly, if we keep the association between the asset and the trigger, it will be impossible to clean-up these triggers. I want to be able to remove triggers that are not used (meaning, not associated to a task and an asset). Which means they will keep infinitely pooling an external resource. That could be very costly.

On that same topic, when doing some testing, I noticed that this function is called per DAG (am I wrong?). As a consequence, this piece of code removes the associations I just created before. I need to fix that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch from 682b713 to c2660ea Compare November 14, 2024 19:27
@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch 3 times, most recently from fe5d227 to 9543a51 Compare November 14, 2024 20:24
@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch from 9543a51 to c4c5c3e Compare November 14, 2024 20:59
]

# Remove references from assets no longer used
orphan_assets = session.scalars(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr do we need to check AssetActive here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An asset without an AssetActive entry is not referenced anywhere, and trigerring an event to such an asset will therefore simply do nothing. So not checking AssetActive here is not useful in practice, but maybe theoratically a possibility? It depends on what we want the user to be able to do, I guess. @vincbeck Do you think a user should be able to trigger an event on an asset that does not actually exist in any DAGs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I did not know that notion of AssetActive, maybe I could use it.

Do you think a user should be able to trigger an event on an asset that does not actually exist in any DAGs?

Absolutely not, that's what I am doing (or trying to do) here but even further. orphan_assets contains all the assets not used by any DAGs as schedule. In other words, no DAG use an asset in orphan_assets as schedule condition. I am removing all references from these assets since they are not used to schedule DAG. The way I understand it is, all assets with an AssetActive entry right is a subset of orphan_assets ?

*,
group: str = "",
extra: dict | None = None,
watchers: list[BaseTrigger] | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if watcher is a good name for this. What do we expect this to do? If I understand AIP-82 correctly, an external event would fire the trigger, and the trigger would create events for assets associated to it.

Assuming my understanding is correct, the triggers here are not watchers of the asset; rather, the asset watches the triggers. The relationship is the other way around. So it is probably better to call this watch instead? Or maybe this attribute should live on the trigger instead, something like

asset = Asset("example_asset_watchers")

trigger = SqsSensorTrigger(sqs_queue="my_queue", trigger=[asset])

DAG(..., schedule=[asset])

Tell me what you think on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming ... so hard haha. I see your point.

The reason why I called it watchers is because the triggers will watch some external resource and send event on updates. In that sense, to me, the triggers are watchers. I am not strongly again watch if you think it makes more sense. To be very honest, between watchers and watch I dont mind, I think the both of them makes sense.

However, I definitely want the attribute on the asset class, I think it makes more sense and a more deliberate choice for the user to say, I have this asset and I want this asset to be updated when these triggers fire.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-82. Save references asset <-> triggers when parsing DAGs
3 participants