Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration/prefect): prefect block to emit prefect pipeline #8414

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
978bfbb
Prefect source integration code added
shubhamjagtap639 Jul 13, 2023
a6f2a5e
prefect-dataub package integrated with datahub
shubhamjagtap639 Jul 13, 2023
e84c1ac
Prefect doc Spell mistake corrected
shubhamjagtap639 Jul 13, 2023
2bef6cb
Remove not necessary md file
shubhamjagtap639 Jul 13, 2023
0910f09
Version added for some pakages in prefect-datahub
shubhamjagtap639 Jul 14, 2023
edfead9
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Jul 14, 2023
5efc31e
Prefect version 2.0.0 restriction removed
shubhamjagtap639 Jul 14, 2023
745515c
Merge branch 'prefect-source-integration' of https://github.com/shubh…
shubhamjagtap639 Jul 14, 2023
43bf87f
Prefect version set to >=2.0.0
shubhamjagtap639 Jul 17, 2023
8b027a1
prefect-datahub build error fixed for python 3.7
shubhamjagtap639 Jul 17, 2023
a4ed11a
mypy stubs packages added
shubhamjagtap639 Jul 17, 2023
0949263
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Jul 17, 2023
0a7d63e
acryl-datahub package added
shubhamjagtap639 Jul 18, 2023
3d17c79
Merge branch 'prefect-source-integration' of https://github.com/shubh…
shubhamjagtap639 Jul 18, 2023
1f5b9ca
Added some missing required setup files
shubhamjagtap639 Jul 18, 2023
3976323
Extra packages added in requirements-dev
shubhamjagtap639 Jul 18, 2023
70c298a
Added some extra packages
shubhamjagtap639 Jul 18, 2023
a142203
temp changes
shubhamjagtap639 Jul 18, 2023
6612458
Revert temp changes
shubhamjagtap639 Jul 18, 2023
1b861fc
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Aug 8, 2023
be8c35a
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Sep 11, 2023
2075ac2
Prefect plugin code modified as per latest airflow plugin code
shubhamjagtap639 Sep 13, 2023
6b4ff59
Merge branch 'master' into prefect-source-integration
shubhamjagtap639 Sep 13, 2023
e4299d4
Add epoch 1 for dev build versions
shubhamjagtap639 Sep 13, 2023
cfde3ae
build error fixed
shubhamjagtap639 Sep 13, 2023
f963b55
syntax error resolved
shubhamjagtap639 Sep 13, 2023
2ba316e
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Feb 5, 2024
5dbebbc
Merge branch 'master' into prefect-source-integration
treff7es Feb 8, 2024
eefb576
Address review comments
shubhamjagtap639 Feb 8, 2024
021eede
Modify prefect-plugin yml file
shubhamjagtap639 Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
- name: Gradle build (and test) for NOT metadata ingestion
if: ${{ matrix.command == 'except_metadata_ingestion' && needs.setup.outputs.backend_change == 'true' }}
run: |
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:airflow-plugin:check -x :datahub-frontend:build -x :datahub-web-react:build --parallel
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:airflow-plugin:check -x :metadata-ingestion-modules:prefect-plugin:build -x :metadata-ingestion-modules:prefect-plugin:check -x :datahub-frontend:build -x :datahub-web-react:build --parallel
- name: Gradle build (and test) for frontend
if: ${{ matrix.command == 'frontend' && needs.setup.outputs.frontend_change == 'true' }}
run: |
Expand Down
85 changes: 85 additions & 0 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: Prefect Plugin
on:
push:
branches:
- master
paths:
- ".github/workflows/prefect-plugin.yml"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
pull_request:
branches:
- "**"
paths:
- ".github/workflows/prefect-plugin.yml"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
release:
types: [published]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
prefect-plugin:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.7", "3.10"]
include:
- python-version: "3.7"
- python-version: "3.10"
fail-fast: false
steps:
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: 17
- uses: gradle/gradle-build-action@v2
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install prefect package
run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: ${{ always() && matrix.python-version == '3.10'}}
with:
name: Test Results (Prefect Plugin ${{ matrix.python-version}})
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
!**/binary/**
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: .
fail_ci_if_error: false
flags: prefect,prefect-${{ matrix.extra_pip_extras }}

Check failure on line 74 in .github/workflows/prefect-plugin.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] reported by reviewdog 🐶 property "extra_pip_extras" is not defined in object type {python-version: number} [expression] Raw Output: .github/workflows/prefect-plugin.yml:74:38: property "extra_pip_extras" is not defined in object type {python-version: number} [expression]
name: pytest-prefect-${{ matrix.python-version }}
verbose: true

event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
2 changes: 1 addition & 1 deletion .github/workflows/test-results.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test Results

on:
workflow_run:
workflows: ["build & test", "metadata ingestion", "Airflow Plugin"]
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Prefect Plugin"]
types:
- completed

Expand Down
3 changes: 2 additions & 1 deletion docs-website/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ task yarnInstall(type: YarnTask) {
task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
generateGraphQLSchema, generateJsonSchema,
':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen',
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) {
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel',
':metadata-ingestion-modules:prefect-plugin:buildWheel'] ) {
inputs.files(projectMdFiles)
outputs.cacheIf { true }
args = ['run', 'generate']
Expand Down
1 change: 1 addition & 0 deletions docs-website/generateDocsDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ function copy_python_wheels(): void {
const wheel_dirs = [
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/prefect-plugin/dist",
];

const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels");
Expand Down
8 changes: 8 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ module.exports = {
id: "docs/lineage/airflow",
label: "Airflow",
},
{
type: "doc",
id: "docs/lineage/prefect",
label: "Prefect",
},
//"docker/airflow/local_airflow",
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
Expand Down Expand Up @@ -750,6 +755,9 @@ module.exports = {
//"docs/how/graph-onboarding",
//"docs/demo/graph-onboarding",
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/prefect-plugin/README"
//"metadata-ingestion-modules/prefect-plugin/docs/concept_mapping"
//"metadata-ingestion-modules/prefect-plugin/docs/datahub_emitter"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
// "docs/what/entity",
Expand Down
135 changes: 135 additions & 0 deletions docs/lineage/prefect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Prefect Integration

DataHub supports integration of

- Prefect flow and task metadata
- Flow run and Task run information as well as
- Lineage information when present

## What is Prefect Datahub Block?

Blocks are primitive within Prefect that enable the storage of configuration and provide an interface for interacting with external systems. We integrated `prefect-datahub` block which use [Datahub Rest](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to emit metadata events while running prefect flow.

## Prerequisites to use Prefect Datahub Block

1. You need to use either Prefect Cloud (recommended) or the self hosted Prefect server.
2. Refer [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) to setup Prefect Cloud.
3. Refer [Host Prefect server](https://docs.prefect.io/latest/guides/host/) to setup self hosted Prefect server.
4. Make sure the Prefect api url is set correctly. You can check it by running below command:
```shell
prefect profile inspect
```
5. If you are using Prefect Cloud, the API URL should be set as `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>`.
6. If you are using a self-hosted Prefect server, the API URL should be set as `http://<host>:<port>/api`.

## Setup

### Installation

Install `prefect-datahub` with `pip`:

```shell
pip install 'prefect-datahub'
```

Requires an installation of Python 3.7+.

### Saving configurations to a block

This is a one-time activity, where you can save the configuration on the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks).
While saving you can provide below configurations. Default value will get set if not provided while saving the configuration to block.

Config | Type | Default | Description
--- | --- | --- | ---
datahub_rest_url | `str` | *http://localhost:8080* | DataHub GMS REST URL
env | `str` | *PROD* | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer [here](https://datahubproject.io/docs/graphql/enums/#fabrictype).
platform_instance | `str` | *None* | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer [here](https://datahubproject.io/docs/platform-instances/).

```python
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("BLOCK-NAME-PLACEHOLDER")
```

Congrats! You can now load the saved block to use your configurations in your Flow code:

```python
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER")
```

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_datahub
```

### Load the saved block in prefect workflows

After installing `prefect-datahub` and [saving the configution](#saving-configurations-to-a-block), you can easily use it within your prefect workflows to help you emit metadata event as show below!

```python
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter

datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")

@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data

@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
```

**Note**: To emit the tasks, user compulsory need to emit flow. Otherwise nothing will get emit.

## Concept mapping

Prefect concepts are documented [here](https://docs.prefect.io/latest/concepts/), and datahub concepts are documented [here](https://datahubproject.io/docs/what-is-datahub/datahub-concepts).

Prefect Concept | DataHub Concept
--- | ---
[Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/)
[Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance)
[Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/)
[Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance)
[Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/)


## How to validate saved block and emit of metadata

1. Go and check in Prefect UI at the Blocks menu if you can see the datahub emitter.
2. Run a Prefect workflow. In the flow logs, you should see Datahub related log messages like:

```
Emitting flow to datahub...
Emitting tasks to datahub...
```
## Debugging

### Incorrect Prefect API URL

If your Prefect API URL aren't being generated correctly or set incorrectly, then in that case you can set the Prefect API URL manually as show below:

```shell
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
```

### Connection error for Datahub Rest URL
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your GMS service is not up.
Loading
Loading