-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/airflow): airflow plugin v2 #8853
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. I think this could be more closely integrated with OL, but I understand if that's not desired / could be restrictive in the future.
@@ -18,4 +18,18 @@ def get_provider_info(): | |||
"package-name": f"{__package_name__}", | |||
"name": f"{__package_name__}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could probably pick a more human-friendly name here
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/__init__.py
Outdated
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/__init__.py
Outdated
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py
Show resolved
Hide resolved
|
||
@contextlib.contextmanager | ||
def _patch_extractors(self): | ||
with contextlib.ExitStack() as stack: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty cool, kinda reminds me of go defer
datajob.inlets.extend(original_datajob.inlets) | ||
datajob.outlets.extend(original_datajob.outlets) | ||
datajob.fine_grained_lineages.extend(original_datajob.fine_grained_lineages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come you build up local vars input_urns
, output_urns
, and fine_grained_lineages
above, but then directly update the datajob here? Could we be consistent here?
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
Outdated
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py
Show resolved
Hide resolved
|
||
# TODO: Add handling for Airflow mapped tasks using task_instance.map_index | ||
|
||
datajob.emit(self.emitter, callback=self._make_emit_callback()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't matter for this PR, but I'd prefer something like self.emitter.emit_entity(datajob, callback)
to more closely match our mcp emit call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup I agree - I'm not super happy with the whole datajob/dataflow interfaces
Would previously see this from the plugin manager. ``` Exception: The package 'acryl-datahub' from setuptools and acryl-datahub-airflow-plugin do not match. Please make sure they are aligned ``` Also fixes + registers hook types
continue making tweaks refactor ti + config refactor get_lineage_config refactoring add in special case handling from backport listener-based plugin fix hooks + add log_level config make logging work more sql fixes start working on combining lineage work with old datahub lineage entities support fine grained lineage support enable_extractors tweak stuff preserve datajob info fix bugs in emitter support threaded exec fix rendering refactoring more refactoring tweak comment review comments mypy fix mypy errors drop python 3.7 fix lint fix lint
b630852
to
b4545cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing looks good to me
- Airflow DAG and tasks, including properties, ownership, and tags. | ||
- Task run information, including task successes and failures. | ||
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice these are most of the improvements we were looking for
|
||
The Airflow lineage plugin is only supported with Airflow version >= 2.0.2 or on MWAA with an Airflow version >= 2.0.2. | ||
There's two actively supported implementations of the plugin, with different Airflow version support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long do we intend to support both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably for a while - there's no good reason to drop support yet
Stacked on top of #8861.Also requires the changes from #8897.Closes #8892.
Closes #6556.
Closes #7809.
Checklist