-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start porting DAG definition code to the Task SDK #43076
Conversation
32a6f3f
to
74dd8f9
Compare
This is very much at the "boring ground work stage" - once we've got this covered we can add some of the execution changes which is not of the n"execution interface" side where it gets fun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments. In general looks great!
A bit of a challenge to review such a big PR - and I assume it is just the starting :-D - if possible would be cool to split this a bit up for better review, e.g.
- One PR just the skeleton and AbstractOperator or so
- One PR for BaseOperator
- One PR for DAG only
- Maybe one for Nodes/TaskGroup etc clubbing the rest together.
Anyway after some time was able to digest. Would offer also a full review if needed. And please don't take my early comments too serious more or less thinking out loud.
@kaxil @jscheffl I've just pushed latest changes to this. Probably not worth looking at it again until I've finished it and have a decent idea that the tests are passing locally, but in the last commit I've just pushed I have removed the old code I've ported over, so it might be slightly clearer where I've just moved code. |
2657f36
to
22566d8
Compare
This comment was marked as outdated.
This comment was marked as outdated.
c0a7b96
to
fd1085d
Compare
29f9cda
to
d4e015a
Compare
d4e015a
to
acdfa2c
Compare
5bb8796
to
2301523
Compare
03282b4
to
11e4304
Compare
This comment was marked as outdated.
This comment was marked as outdated.
c0d765b
to
3acfb5c
Compare
While debugging test failures on apache#43076, found that this docstring was wrong, most likely a copy/paste error of `ti._handle_failure`
While debugging test failures on #43076, found that this docstring was wrong, most likely a copy/paste error of `ti._handle_failure`
fc8645e
to
6768541
Compare
…ons in base classes Fix was pydantic/pydantic#8751
Since the scheduler needs it (at least for now), we need to ensure that we include it in our prod-image builds in CI
a94684a
to
1ef4b33
Compare
There are lots more things to move over to the TaskSDK before AIP-72 will be complete which we will track in the project board, but since this is the big base, and slow to test due to unavoidable wide reaching changes we will merge this and follow up with future changes for the rest (things like Mapped operator, DagParam etc) |
Merging this, one of the failures around k8s test that took longer than an hour looks like a transient one:
If for some reason it fails on main too, we will debug it |
Fixing one of the TODOs: apache#43076 This was already addressed.
Fixing one of the TODOs: #43076 This was already addressed.
While debugging test failures on apache#43076, found that this docstring was wrong, most likely a copy/paste error of `ti._handle_failure`
closes apache#43011 By "definition code" we mean anything needed at definition/parse time, leaving anything to do with scheduling time decisions in Airflow's core. Also in this PR I have _attempted_ to keep it to only porting defintiion code for simple DAGs, leaving anything to do with mapped tasks or execution time in core for now, but a few things "leaked" across. And as the goal of this PR is to go from working state to working state some of the code in Task SDK still imports from "core" (various types, enums or helpers) that will need to be resolved before 3.0 release, but it is fine for now. I'm also aware that the class hierarchy with airflow.models.baseoperator.BaseOperator (and to a lesser extend with DAG) in particular is very messy right now, and we will need to think how we want to add on the scheduling-time functions etc, as I'm not yet sold that having Core Airflow depend upon the Task-SDK classes/import the code is the right structure, but we can address that later We will also need to addresses the rendered docs for the Task SDK in a future PR -- the goal is that "anything" exposed on `airflow.sdk` directly is part of the public API, but right now the renedered docs show DAG as `airflow.sdk.definitions.dag.DAG` which is certainly not what we want users to see. Co-authored-by: Kaxil Naik <[email protected]>
Fixing one of the TODOs: apache#43076 This was already addressed.
By "definition code" we mean anything needed at definition/parse time, leaving
anything to do with scheduling time decisions in Airflow's core.
Also in this PR I have attempted to keep it to only porting defintiion code
for simple DAGs, leaving anything to do with mapped tasks or execution time in
core for now, but a few things "leaked" across.
And as the goal of this PR is to go from working state to working state some
of the code in Task SDK still imports from "core" (various types, enums or
helpers) that will need to be resolved before 3.0 release, but it is fine for
now.
I'm also aware that the class hierarchy with
airflow.models.baseoperator.BaseOperator (and to a lesser extend with DAG) in
particular is very messy right now, and we will need to think how we want to
add on the scheduling-time functions etc, as I'm not yet sold that having Core
Airflow depend upon the Task-SDK classes/import the code is the right
structure, but we can address that later
We will also need to addresses the rendered docs for the Task SDK in a future
PR -- the goal is that "anything" exposed on
airflow.sdk
directly is part ofthe public API, but right now the renedered docs show DAG as
airflow.sdk.definitions.dag.DAG
which is certainly not what we want users tosee.
closes #43011