Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration/prefect): prefect block to emit prefect pipeline #8414

Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
978bfbb
Prefect source integration code added
shubhamjagtap639 Jul 13, 2023
a6f2a5e
prefect-dataub package integrated with datahub
shubhamjagtap639 Jul 13, 2023
e84c1ac
Prefect doc Spell mistake corrected
shubhamjagtap639 Jul 13, 2023
2bef6cb
Remove not necessary md file
shubhamjagtap639 Jul 13, 2023
0910f09
Version added for some pakages in prefect-datahub
shubhamjagtap639 Jul 14, 2023
edfead9
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Jul 14, 2023
5efc31e
Prefect version 2.0.0 restriction removed
shubhamjagtap639 Jul 14, 2023
745515c
Merge branch 'prefect-source-integration' of https://github.com/shubh…
shubhamjagtap639 Jul 14, 2023
43bf87f
Prefect version set to >=2.0.0
shubhamjagtap639 Jul 17, 2023
8b027a1
prefect-datahub build error fixed for python 3.7
shubhamjagtap639 Jul 17, 2023
a4ed11a
mypy stubs packages added
shubhamjagtap639 Jul 17, 2023
0949263
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Jul 17, 2023
0a7d63e
acryl-datahub package added
shubhamjagtap639 Jul 18, 2023
3d17c79
Merge branch 'prefect-source-integration' of https://github.com/shubh…
shubhamjagtap639 Jul 18, 2023
1f5b9ca
Added some missing required setup files
shubhamjagtap639 Jul 18, 2023
3976323
Extra packages added in requirements-dev
shubhamjagtap639 Jul 18, 2023
70c298a
Added some extra packages
shubhamjagtap639 Jul 18, 2023
a142203
temp changes
shubhamjagtap639 Jul 18, 2023
6612458
Revert temp changes
shubhamjagtap639 Jul 18, 2023
1b861fc
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Aug 8, 2023
be8c35a
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Sep 11, 2023
2075ac2
Prefect plugin code modified as per latest airflow plugin code
shubhamjagtap639 Sep 13, 2023
6b4ff59
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Sep 13, 2023
e4299d4
Add epoch 1 for dev build versions
shubhamjagtap639 Sep 13, 2023
cfde3ae
build error fixed
shubhamjagtap639 Sep 13, 2023
f963b55
syntax error resolved
shubhamjagtap639 Sep 13, 2023
2ba316e
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Feb 5, 2024
5dbebbc
Merge branch 'master' into prefect-source-integration
treff7es Feb 8, 2024
eefb576
Address review comments
shubhamjagtap639 Feb 8, 2024
021eede
Modify prefect-plugin yml file
shubhamjagtap639 Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ jobs:
matrix:
command:
[
"./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :datahub-frontend:build -x :datahub-web-react:build --parallel",
"./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:prefect-plugin:build -x :datahub-frontend:build -x :datahub-web-react:build --parallel",
"./gradlew :datahub-frontend:build :datahub-web-react:build --parallel",
"./gradlew :metadata-ingestion-modules:airflow-plugin:build --parallel"
"./gradlew :metadata-ingestion-modules:airflow-plugin:build --parallel",
"./gradlew :metadata-ingestion-modules:prefect-plugin:build --parallel"
]
timezone:
[
Expand Down
78 changes: 78 additions & 0 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Prefect Plugin
on:
push:
branches:
- master
paths:
- ".github/workflows/prefect-plugin.yml"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
pull_request:
branches:
- master
paths:
- ".github/**"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
release:
types: [published]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
prefect-plugin:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.7", "3.10"]
include:
- python-version: "3.7"
- python-version: "3.10"
fail-fast: false
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install prefect package and test (extras ${{ matrix.extraPythonRequirement }})

Check failure on line 47 in .github/workflows/prefect-plugin.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] reported by reviewdog 🐶 property "extrapythonrequirement" is not defined in object type {python-version: number} [expression] Raw Output: .github/workflows/prefect-plugin.yml:47:61: property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extraPythonRequirement }}' :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:airflow-plugin:testQuick

Check failure on line 48 in .github/workflows/prefect-plugin.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] reported by reviewdog 🐶 property "extrapythonrequirement" is not defined in object type {python-version: number} [expression] Raw Output: .github/workflows/prefect-plugin.yml:48:54: property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: ${{ always() && matrix.python-version == '3.10' && matrix.extraPythonRequirement == 'prefect>=2.0.0' }}

Check failure on line 53 in .github/workflows/prefect-plugin.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] reported by reviewdog 🐶 property "extrapythonrequirement" is not defined in object type {python-version: number} [expression] Raw Output: .github/workflows/prefect-plugin.yml:53:64: property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]
with:
name: Test Results (Prefect Plugin ${{ matrix.python-version}})
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: .
fail_ci_if_error: false
flags: prefect-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }}

Check failure on line 67 in .github/workflows/prefect-plugin.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] reported by reviewdog 🐶 property "extrapythonrequirement" is not defined in object type {python-version: number} [expression] Raw Output: .github/workflows/prefect-plugin.yml:67:59: property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]
name: pytest-prefect
verbose: true

event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
2 changes: 1 addition & 1 deletion .github/workflows/test-results.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test Results

on:
workflow_run:
workflows: ["build & test", "metadata ingestion", "Airflow Plugin"]
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Prefect Plugin"]
types:
- completed

Expand Down
3 changes: 2 additions & 1 deletion docs-website/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ task yarnInstall(type: YarnTask) {
task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
generateGraphQLSchema, generateJsonSchema,
':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen',
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) {
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel',
':metadata-ingestion-modules:prefect-plugin:buildWheel'] ) {
inputs.files(projectMdFiles)
outputs.cacheIf { true }
args = ['run', 'generate']
Expand Down
1 change: 1 addition & 0 deletions docs-website/generateDocsDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ function copy_python_wheels(): void {
const wheel_dirs = [
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/prefect-plugin/dist",
];

const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels");
Expand Down
8 changes: 8 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ module.exports = {
id: "docs/lineage/airflow",
label: "Airflow",
},
{
type: "doc",
id: "docs/lineage/prefect",
label: "Prefect",
},

//"docker/airflow/local_airflow",
"metadata-integration/java/spark-lineage/README",
Expand Down Expand Up @@ -638,6 +643,9 @@ module.exports = {
//"docs/how/graph-onboarding",
//"docs/demo/graph-onboarding",
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/prefect-plugin/README"
//"metadata-ingestion-modules/prefect-plugin/docs/concept_mapping"
//"metadata-ingestion-modules/prefect-plugin/docs/datahub_emitter"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
// "docs/what/entity",
Expand Down
49 changes: 49 additions & 0 deletions docs/lineage/prefect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Prefect Integration

DataHub supports integration of

- Prefect flow and task metadata
- Flow run and Task run information as well as
- Lineage information when present

## What is Prefect Datahub Block?

Blocks are primitive within Prefect that enable the storage of configuration and provide an interface for interacting with external systems. We integrated [prefect-datahub](https://prefecthq.github.io/prefect-datahub/) block which use [Datahub Rest](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to emit metadata events while running prefect flow.

## Prerequisites to use Prefect Datahub Block

1. You need to use either Prefect Cloud (recommended) or the self hosted Prefect server.
2. Refer [Cloud Quickstart](https://docs.prefect.io/2.10.13/cloud/cloud-quickstart/) to setup Prefect Cloud.
3. Refer [Host Prefect server](https://docs.prefect.io/2.10.13/host/) to setup self hosted Prefect server.
4. Make sure the Prefect api url is set correctly. You can check it by running below command:
```shell
prefect profile inspect
```
5. If you are using Prefect Cloud, the API URL should be set as `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>`.
6. If you are using a self-hosted Prefect server, the API URL should be set as `http://<host>:<port>/api`.

## Setup

For setup details please refer [prefect-datahub](https://prefecthq.github.io/prefect-datahub/).

## How to validate saved block and emit of metadata

1. Go and check in Prefect UI at the Blocks menu if you can see the datahub emitter.
2. Run a Prefect workflow. In the flow logs, you should see Datahub related log messages like:

```
Emitting flow to datahub...
Emitting tasks to datahub...
```
## Debugging

### Incorrect Prefect API URL

If your Prefect API URL aren't being generated correctly or set incorrectly, then in that case you can set the Prefect API URL manually as show below:

```shell
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
```

### Connection error for Datahub Rest URL
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your GMS service is not up.
143 changes: 143 additions & 0 deletions metadata-ingestion-modules/prefect-plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
.envrc
src/prefect_datahub/__init__.py.bak
.vscode/
output
pvenv36/
bq_credentials.json
/tmp
*.bak

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Generated classes
src/datahub/metadata/
wheels/
junit.quick.xml
Loading
Loading