diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 6b7f2b5035c25..b66500da35c7b 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -76,7 +76,7 @@ jobs: - name: Gradle build (and test) for NOT metadata ingestion if: ${{ matrix.command == 'except_metadata_ingestion' && needs.setup.outputs.backend_change == 'true' }} run: | - ./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:airflow-plugin:check -x :datahub-frontend:build -x :datahub-web-react:build --parallel + ./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:airflow-plugin:check -x :metadata-ingestion-modules:prefect-plugin:build -x :metadata-ingestion-modules:prefect-plugin:check -x :datahub-frontend:build -x :datahub-web-react:build --parallel - name: Gradle build (and test) for frontend if: ${{ matrix.command == 'frontend' && needs.setup.outputs.frontend_change == 'true' }} run: | diff --git a/.github/workflows/prefect-plugin.yml b/.github/workflows/prefect-plugin.yml new file mode 100644 index 0000000000000..47bf417029330 --- /dev/null +++ b/.github/workflows/prefect-plugin.yml @@ -0,0 +1,85 @@ +name: Prefect Plugin +on: + push: + branches: + - master + paths: + - ".github/workflows/prefect-plugin.yml" + - "metadata-ingestion-modules/prefect-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + pull_request: + branches: + - "**" + paths: + - ".github/workflows/prefect-plugin.yml" + - "metadata-ingestion-modules/prefect-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + release: + types: [published] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + prefect-plugin: + runs-on: ubuntu-latest + env: + SPARK_VERSION: 3.0.3 + DATAHUB_TELEMETRY_ENABLED: false + strategy: + matrix: + python-version: ["3.7", "3.10"] + include: + - python-version: "3.7" + - python-version: "3.10" + fail-fast: false + steps: + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + distribution: "zulu" + java-version: 17 + - uses: gradle/gradle-build-action@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh + - name: Install prefect package + run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick + - name: pip freeze show list installed + if: always() + run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze + - uses: actions/upload-artifact@v3 + if: ${{ always() && matrix.python-version == '3.10'}} + with: + name: Test Results (Prefect Plugin ${{ matrix.python-version}}) + path: | + **/build/reports/tests/test/** + **/build/test-results/test/** + **/junit.*.xml + !**/binary/** + - name: Upload coverage to Codecov + if: always() + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + directory: . + fail_ci_if_error: false + flags: prefect,prefect-${{ matrix.extra_pip_extras }} + name: pytest-prefect-${{ matrix.python-version }} + verbose: true + + event-file: + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v3 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/.github/workflows/test-results.yml b/.github/workflows/test-results.yml index 0153060692271..cdb89b1fa6b5d 100644 --- a/.github/workflows/test-results.yml +++ b/.github/workflows/test-results.yml @@ -2,7 +2,7 @@ name: Test Results on: workflow_run: - workflows: ["build & test", "metadata ingestion", "Airflow Plugin"] + workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Prefect Plugin"] types: - completed diff --git a/docs-website/build.gradle b/docs-website/build.gradle index 2644491a2a5f8..702116fece0f5 100644 --- a/docs-website/build.gradle +++ b/docs-website/build.gradle @@ -73,7 +73,8 @@ task yarnInstall(type: YarnTask) { task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall, generateGraphQLSchema, generateJsonSchema, ':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen', - ':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) { + ':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel', + ':metadata-ingestion-modules:prefect-plugin:buildWheel'] ) { inputs.files(projectMdFiles) outputs.cacheIf { true } args = ['run', 'generate'] diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index e19f09530665a..939535b4229fa 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -572,6 +572,7 @@ function copy_python_wheels(): void { const wheel_dirs = [ "../metadata-ingestion/dist", "../metadata-ingestion-modules/airflow-plugin/dist", + "../metadata-ingestion-modules/prefect-plugin/dist", ]; const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels"); diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 1e6d8bec01813..aa57c815f7ac1 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -298,6 +298,11 @@ module.exports = { id: "docs/lineage/airflow", label: "Airflow", }, + { + type: "doc", + id: "docs/lineage/prefect", + label: "Prefect", + }, //"docker/airflow/local_airflow", "metadata-integration/java/spark-lineage/README", "metadata-ingestion/integration_docs/great-expectations", @@ -750,6 +755,9 @@ module.exports = { //"docs/how/graph-onboarding", //"docs/demo/graph-onboarding", //"metadata-ingestion-modules/airflow-plugin/README" + //"metadata-ingestion-modules/prefect-plugin/README" + //"metadata-ingestion-modules/prefect-plugin/docs/concept_mapping" + //"metadata-ingestion-modules/prefect-plugin/docs/datahub_emitter" // "metadata-ingestion/schedule_docs/datahub", // we can delete this // TODO: change the titles of these, removing the "What is..." portion from the sidebar" // "docs/what/entity", diff --git a/docs/lineage/prefect.md b/docs/lineage/prefect.md new file mode 100644 index 0000000000000..1246e781142d7 --- /dev/null +++ b/docs/lineage/prefect.md @@ -0,0 +1,135 @@ +# Prefect Integration + +DataHub supports integration of + +- Prefect flow and task metadata +- Flow run and Task run information as well as +- Lineage information when present + +## What is Prefect Datahub Block? + +Blocks are primitive within Prefect that enable the storage of configuration and provide an interface for interacting with external systems. We integrated `prefect-datahub` block which use [Datahub Rest](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to emit metadata events while running prefect flow. + +## Prerequisites to use Prefect Datahub Block + +1. You need to use either Prefect Cloud (recommended) or the self hosted Prefect server. +2. Refer [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) to setup Prefect Cloud. +3. Refer [Host Prefect server](https://docs.prefect.io/latest/guides/host/) to setup self hosted Prefect server. +4. Make sure the Prefect api url is set correctly. You can check it by running below command: +```shell +prefect profile inspect +``` +5. If you are using Prefect Cloud, the API URL should be set as `https://api.prefect.cloud/api/accounts//workspaces/`. +6. If you are using a self-hosted Prefect server, the API URL should be set as `http://:/api`. + +## Setup + +### Installation + +Install `prefect-datahub` with `pip`: + +```shell +pip install 'prefect-datahub' +``` + +Requires an installation of Python 3.7+. + +### Saving configurations to a block + +This is a one-time activity, where you can save the configuration on the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks). +While saving you can provide below configurations. Default value will get set if not provided while saving the configuration to block. + +Config | Type | Default | Description +--- | --- | --- | --- +datahub_rest_url | `str` | *http://localhost:8080* | DataHub GMS REST URL +env | `str` | *PROD* | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer [here](https://datahubproject.io/docs/graphql/enums/#fabrictype). +platform_instance | `str` | *None* | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer [here](https://datahubproject.io/docs/platform-instances/). + +```python +from prefect_datahub.datahub_emitter import DatahubEmitter +DatahubEmitter( + datahub_rest_url="http://localhost:8080", + env="PROD", + platform_instance="local_prefect" +).save("BLOCK-NAME-PLACEHOLDER") +``` + +Congrats! You can now load the saved block to use your configurations in your Flow code: + +```python +from prefect_datahub.datahub_emitter import DatahubEmitter +DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER") +``` + +!!! info "Registering blocks" + + Register blocks in this module to + [view and edit them](https://docs.prefect.io/ui/blocks/) + on Prefect Cloud: + + ```bash + prefect block register -m prefect_datahub + ``` + +### Load the saved block in prefect workflows + +After installing `prefect-datahub` and [saving the configution](#saving-configurations-to-a-block), you can easily use it within your prefect workflows to help you emit metadata event as show below! + +```python +from prefect import flow, task +from prefect_datahub.dataset import Dataset +from prefect_datahub.datahub_emitter import DatahubEmitter + +datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME") + +@task(name="Transform", description="Transform the data") +def transform(data): + data = data.split(" ") + datahub_emitter.add_task( + inputs=[Dataset("snowflake", "mydb.schema.tableA")], + outputs=[Dataset("snowflake", "mydb.schema.tableC")], + ) + return data + +@flow(name="ETL flow", description="Extract transform load flow") +def etl(): + data = transform("This is data") + datahub_emitter.emit_flow() +``` + +**Note**: To emit the tasks, user compulsory need to emit flow. Otherwise nothing will get emit. + +## Concept mapping + +Prefect concepts are documented [here](https://docs.prefect.io/latest/concepts/), and datahub concepts are documented [here](https://datahubproject.io/docs/what-is-datahub/datahub-concepts). + +Prefect Concept | DataHub Concept +--- | --- +[Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) +[Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) +[Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) +[Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) +[Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) + + +## How to validate saved block and emit of metadata + +1. Go and check in Prefect UI at the Blocks menu if you can see the datahub emitter. +2. Run a Prefect workflow. In the flow logs, you should see Datahub related log messages like: + +``` +Emitting flow to datahub... +Emitting tasks to datahub... +``` +## Debugging + +### Incorrect Prefect API URL + +If your Prefect API URL aren't being generated correctly or set incorrectly, then in that case you can set the Prefect API URL manually as show below: + +```shell +prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api' +``` + +### Connection error for Datahub Rest URL +If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your GMS service is not up. diff --git a/metadata-ingestion-modules/prefect-plugin/.gitignore b/metadata-ingestion-modules/prefect-plugin/.gitignore new file mode 100644 index 0000000000000..1d2916d00eabd --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/.gitignore @@ -0,0 +1,143 @@ +.envrc +src/prefect_datahub/__init__.py.bak +.vscode/ +output +pvenv36/ +bq_credentials.json +/tmp +*.bak + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Generated classes +src/datahub/metadata/ +wheels/ +junit.quick.xml diff --git a/metadata-ingestion-modules/prefect-plugin/README.md b/metadata-ingestion-modules/prefect-plugin/README.md new file mode 100644 index 0000000000000..2548221fb5591 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/README.md @@ -0,0 +1,126 @@ +# Emit flows & tasks metadata to DataHub rest with `prefect-datahub` + +

+ + PyPI + + + + + + +
+ + +

+ +## Welcome! + +The `prefect-datahub` collection makes it easy to leverage the capabilities of DataHub emitter in your flows, featuring support for ingesting metadata of flows, tasks & workspace to DataHub gms rest. + + +## Getting Started + +### Setup DataHub UI + +In order to use 'prefect-datahub' collection, you'll first need to deploy the new instance of DataHub. + +You can get the instructions on deploying the open source DataHub by navigating to the [apps page](https://datahubproject.io/docs/quickstart). + +Successful deployment of DataHub will lead creation of DataHub GMS service running on 'http://localhost:8080' if you have deployed it on local system. + +### Saving configurations to a block + + +This is a one-time activity, where you can save the configuration on the [Prefect block document store](https://docs.prefect.io/2.10.13/concepts/blocks/#saving-blocks). +While saving you can provide below configurations. Default value will get set if not provided while saving the configuration to block. + +Config | Type | Default | Description +--- | --- | --- | --- +datahub_rest_url | `str` | *http://localhost:8080* | DataHub GMS REST URL +env | `str` | *PROD* | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer [here](https://datahubproject.io/docs/graphql/enums/#fabrictype). +platform_instance | `str` | *None* | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer [here](https://datahubproject.io/docs/platform-instances/). + +```python +from prefect_datahub.datahub_emitter import DatahubEmitter +DatahubEmitter( + datahub_rest_url="http://localhost:8080", + env="PROD", + platform_instance="local_prefect" +).save("BLOCK-NAME-PLACEHOLDER") +``` + +Congrats! You can now load the saved block to use your configurations in your Flow code: + +```python +from prefect_datahub.datahub_emitter import DatahubEmitter +DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER") +``` + +!!! info "Registering blocks" + + Register blocks in this module to + [view and edit them](https://docs.prefect.io/ui/blocks/) + on Prefect Cloud: + + ```bash + prefect block register -m prefect_datahub + ``` + +### Load the saved block in prefect workflows + +After installing `prefect-datahub` and [saving the configution](#saving-configurations-to-a-block), you can easily use it within your prefect workflows to help you emit metadata event as show below! + +```python +from prefect import flow, task +from prefect_datahub.dataset import Dataset +from prefect_datahub.datahub_emitter import DatahubEmitter + +datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME") + +@task(name="Transform", description="Transform the data") +def transform(data): + data = data.split(" ") + datahub_emitter.add_task( + inputs=[Dataset("snowflake", "mydb.schema.tableA")], + outputs=[Dataset("snowflake", "mydb.schema.tableC")], + ) + return data + +@flow(name="ETL flow", description="Extract transform load flow") +def etl(): + data = transform("This is data") + datahub_emitter.emit_flow() +``` + +**Note**: To emit the tasks, user compulsory need to emit flow. Otherwise nothing will get emit. + +## Resources + +For more tips on how to use tasks and flows in a Collection, check out [Using Collections](https://docs.prefect.io/collections/usage/)! + +### Installation + +Install `prefect-datahub` with `pip`: + +```bash +pip install prefect-datahub +``` + +Requires an installation of Python 3.7+. + +We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv. + +These tasks are designed to work with Prefect 2.0.0 or higher. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/). + +### Feedback + +If you encounter any bugs while using `prefect-datahub`, feel free to open an issue in the [datahub](https://github.com/datahub-project/datahub) repository. + +If you have any questions or issues while using `prefect-datahub`, you can find help in the [Prefect Slack community](https://prefect.io/slack). + +Feel free to star or watch [`datahub`](https://github.com/datahub-project/datahub) for updates too! + +### Contributing + +If you'd like to help contribute to fix an issue or add a feature to `prefect-datahub`, please refer to our [Contributing Guidelines](https://datahubproject.io/docs/contributing). diff --git a/metadata-ingestion-modules/prefect-plugin/build.gradle b/metadata-ingestion-modules/prefect-plugin/build.gradle new file mode 100644 index 0000000000000..ced0b8da5b508 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/build.gradle @@ -0,0 +1,126 @@ +plugins { + id 'base' +} + +ext { + python_executable = 'python3' + venv_name = 'venv' +} + +if (!project.hasProperty("extra_pip_requirements")) { + ext.extra_pip_requirements = "" +} + +def pip_install_command = "${venv_name}/bin/pip install -e ../../metadata-ingestion" + +task checkPythonVersion(type: Exec) { + commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)' +} + +task environmentSetup(type: Exec, dependsOn: checkPythonVersion) { + def sentinel_file = "${venv_name}/.venv_environment_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "${python_executable} -m venv ${venv_name} &&" + + "${venv_name}/bin/python -m pip install --upgrade pip wheel 'setuptools>=63.0.0' && " + + "touch ${sentinel_file}" +} + +task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) { + def sentinel_file = "${venv_name}/.build_install_package_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + // Workaround for https://github.com/yaml/pyyaml/issues/601. + // See https://github.com/yaml/pyyaml/issues/601#issuecomment-1638509577. + // and https://github.com/datahub-project/datahub/pull/8435. + commandLine 'bash', '-x', '-c', + "${pip_install_command} install 'Cython<3.0' 'PyYAML<6' --no-build-isolation && " + + "${pip_install_command} -e . ${extra_pip_requirements} &&" + + "touch ${sentinel_file}" +} + +task install(dependsOn: [installPackage]) + +task installDev(type: Exec, dependsOn: [install]) { + def sentinel_file = "${venv_name}/.build_install_dev_sentinel" + inputs.file file('setup.py') + outputs.file("${sentinel_file}") + commandLine 'bash', '-x', '-c', + "${pip_install_command} -e .[dev] ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +task lint(type: Exec, dependsOn: installDev) { + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "black --check --diff src/ tests/ && " + + "isort --check --diff src/ tests/ && " + + "flake8 --count --statistics src/ tests/ && " + + "mypy --show-traceback --show-error-codes src/ tests/" +} +task lintFix(type: Exec, dependsOn: installDev) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && " + + "black src/ tests/ && " + + "isort src/ tests/ && " + + "flake8 src/ tests/ && " + + "mypy src/ tests/ " +} + +task installDevTest(type: Exec, dependsOn: [installDev]) { + def sentinel_file = "${venv_name}/.build_install_dev_test_sentinel" + inputs.file file('setup.py') + outputs.dir("${venv_name}") + outputs.file("${sentinel_file}") + commandLine 'bash', '-x', '-c', + "${pip_install_command} -e .[dev,integration-tests] && touch ${sentinel_file}" +} + +def testFile = hasProperty('testFile') ? testFile : 'unknown' +task testSingle(dependsOn: [installDevTest]) { + doLast { + if (testFile != 'unknown') { + exec { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest ${testFile}" + } + } else { + throw new GradleException("No file provided. Use -PtestFile=") + } + } +} + +task testQuick(type: Exec, dependsOn: installDevTest) { + // We can't enforce the coverage requirements if we run a subset of the tests. + inputs.files(project.fileTree(dir: "src/", include: "**/*.py")) + inputs.files(project.fileTree(dir: "tests/")) + outputs.dir("${venv_name}") + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -vv --continue-on-collection-errors --junit-xml=junit.quick.xml" +} + + +task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml" +} +task buildWheel(type: Exec, dependsOn: [install]) { + commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " + 'pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_TEST=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh' +} + +task cleanPythonCache(type: Exec) { + commandLine 'bash', '-c', + "find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete" +} + +build.dependsOn install +check.dependsOn lint +check.dependsOn testQuick + +clean { + delete venv_name + delete 'build' + delete 'dist' +} +clean.dependsOn cleanPythonCache diff --git a/metadata-ingestion-modules/prefect-plugin/pyproject.toml b/metadata-ingestion-modules/prefect-plugin/pyproject.toml new file mode 100644 index 0000000000000..fba81486b9f67 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=54.0.0", "wheel", "pip>=21.0.0"] + +[tool.black] +extend-exclude = ''' +# A regex preceded with ^/ will apply only to files and directories +# in the root of the project. +^/tmp +''' +include = '\.pyi?$' + +[tool.isort] +indent = ' ' +profile = 'black' +sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER' + +[tool.pyright] +extraPaths = ['tests'] \ No newline at end of file diff --git a/metadata-ingestion-modules/prefect-plugin/scripts/release.sh b/metadata-ingestion-modules/prefect-plugin/scripts/release.sh new file mode 100755 index 0000000000000..f01287d3e3731 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/scripts/release.sh @@ -0,0 +1,26 @@ +#!/bin/bash +set -euxo pipefail + +if [[ ! ${RELEASE_SKIP_TEST:-} ]]; then + ../../gradlew build # also runs tests +elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then + ../../gradlew install +fi + +MODULE=prefect_datahub + +# Check packaging constraint. +python -c 'import setuptools; where="./src"; assert setuptools.find_packages(where) == setuptools.find_namespace_packages(where), "you seem to be missing or have extra __init__.py files"' +if [[ ${RELEASE_VERSION:-} ]]; then + # Replace version with RELEASE_VERSION env variable + sed -i.bak "s/__version__ = \"0.0.0.dev0\"/__version__ = \"$RELEASE_VERSION\"/" src/${MODULE}/__init__.py +else + vim src/${MODULE}/__init__.py +fi + +rm -rf build dist || true +python -m build +if [[ ! ${RELEASE_SKIP_UPLOAD:-} ]]; then + python -m twine upload 'dist/*' +fi +git restore src/${MODULE}/__init__.py diff --git a/metadata-ingestion-modules/prefect-plugin/setup.cfg b/metadata-ingestion-modules/prefect-plugin/setup.cfg new file mode 100644 index 0000000000000..c59a99fa8aec0 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/setup.cfg @@ -0,0 +1,74 @@ +[flake8] +max-complexity = 15 +ignore = + # Ignore: line length issues, since black's formatter will take care of them. + E501, + # Ignore: 1 blank line required before class docstring. + D203, + # See https://stackoverflow.com/a/57074416. + W503, + # See https://github.com/psf/black/issues/315. + E203 +exclude = + .git, + venv, + .tox, + __pycache__ +per-file-ignores = + # imported but unused + __init__.py: F401 +ban-relative-imports = true + +[mypy] +plugins = + sqlmypy, + pydantic.mypy +exclude = ^(venv|build|dist)/ +ignore_missing_imports = yes +strict_optional = yes +check_untyped_defs = yes +disallow_incomplete_defs = yes +disallow_untyped_decorators = yes +warn_unused_configs = yes +# eventually we'd like to enable these +disallow_untyped_defs = no + +# try to be a bit more strict in certain areas of the codebase +[mypy-datahub.*] +ignore_missing_imports = no +[mypy-tests.*] +ignore_missing_imports = no + +[tool:pytest] +asyncio_mode = auto +addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers + +testpaths = + tests/unit + tests/integration + +[coverage:run] +# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov, +# and tox interact, we should not uncomment the following line. +# See https://pytest-cov.readthedocs.io/en/latest/config.html and +# https://coverage.readthedocs.io/en/coverage-5.0/config.html. +# We also have some additional pytest/cov config options in tox.ini. +# source = src + +[coverage:paths] +# This is necessary for tox-based coverage to be counted properly. +source = + src + */site-packages + +[coverage:report] +# The fail_under value ensures that at least some coverage data is collected. +# We override its value in the tox config. +show_missing = true +exclude_lines = + pragma: no cover + @abstract + if TYPE_CHECKING: +omit = + # omit example jobs + src/prefect_datahub/example/* diff --git a/metadata-ingestion-modules/prefect-plugin/setup.py b/metadata-ingestion-modules/prefect-plugin/setup.py new file mode 100644 index 0000000000000..530d0e24b2cb1 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/setup.py @@ -0,0 +1,121 @@ +import os +import pathlib + +import setuptools + +package_metadata: dict = {} +with open("./src/prefect_datahub/__init__.py") as fp: + exec(fp.read(), package_metadata) + + +def get_long_description(): + root = os.path.dirname(__file__) + return pathlib.Path(os.path.join(root, "README.md")).read_text() + + +rest_common = {"requests", "requests_file"} + +base_requirements = { + # For python 3.7 and importlib-metadata>=5.0.0, build failed with attribute error + "importlib-metadata>=4.4.0,<5.0.0; python_version < '3.8'", + # Actual dependencies. + "prefect >= 2.0.0", + *rest_common, + f"acryl-datahub == {package_metadata['__version__']}", +} + + +mypy_stubs = { + "types-dataclasses", + "sqlalchemy-stubs", + "types-pkg_resources", + "types-six", + "types-python-dateutil", + "types-requests", + "types-toml", + "types-PyYAML", + "types-freezegun", + "types-cachetools", + # versions 0.1.13 and 0.1.14 seem to have issues + "types-click==0.1.12", + "types-tabulate", + # avrogen package requires this + "types-pytz", +} + +dev_requirements = { + *base_requirements, + *mypy_stubs, + "black==22.12.0", + "coverage>=5.1", + "flake8>=3.8.3", + "flake8-tidy-imports>=4.3.0", + "isort>=5.7.0", + "mypy>=1.4.0", + # pydantic 1.8.2 is incompatible with mypy 0.910. + # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. + "pydantic>=1.10", + "pytest>=6.2.2", + "pytest-asyncio>=0.16.0", + "pytest-cov>=2.8.1", + "tox", + "deepdiff", + "requests-mock", + "freezegun", + "jsonpickle", + "build", + "twine", + "packaging", +} + +entry_points = { + "prefect.block": "prefect-datahub = prefect_datahub.prefect_datahub:DatahubEmitter" +} + + +setuptools.setup( + # Package metadata. + name=package_metadata["__package_name__"], + version=package_metadata["__version__"], + url="https://datahubproject.io/", + project_urls={ + "Documentation": "https://datahubproject.io/docs/", + "Source": "https://github.com/datahub-project/datahub", + "Changelog": "https://github.com/datahub-project/datahub/releases", + }, + license="Apache License 2.0", + description="Datahub prefect block to capture executions and send to Datahub", + long_description=get_long_description(), + long_description_content_type="text/markdown", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "Intended Audience :: System Administrators", + "License :: OSI Approved", + "License :: OSI Approved :: Apache Software License", + "Operating System :: Unix", + "Operating System :: POSIX :: Linux", + "Environment :: Console", + "Environment :: MacOS X", + "Topic :: Software Development", + ], + # Package info. + zip_safe=False, + python_requires=">=3.7", + package_dir={"": "src"}, + packages=setuptools.find_namespace_packages(where="./src"), + entry_points=entry_points, + # Dependencies. + install_requires=list(base_requirements), + extras_require={ + "dev": list(dev_requirements), + }, +) diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py new file mode 100644 index 0000000000000..8cc65f9010613 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/__init__.py @@ -0,0 +1,21 @@ +# Published at https://pypi.org/project/acryl-datahub/. +__package_name__ = "prefect-datahub" +__version__ = "1!0.0.0.dev0" + + +def is_dev_mode() -> bool: + return __version__.endswith("dev0") + + +def nice_version_name() -> str: + if is_dev_mode(): + return "unavailable (installed in develop mode)" + return __version__ + + +def get_provider_info(): + return { + "package-name": f"{__package_name__}", + "name": f"{__package_name__}", + "description": "Datahub prefect block to capture executions and send to Datahub", + } diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py new file mode 100644 index 0000000000000..51b6f7c74fd07 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/datahub_emitter.py @@ -0,0 +1,627 @@ +"""Datahub Emitter classes used to emit prefect metadata to Datahub REST.""" + +import asyncio +import traceback +from typing import Dict, List, Optional +from uuid import UUID + +from datahub.api.entities.datajob import DataFlow, DataJob +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstance, + InstanceRunResult, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import BrowsePathsClass +from datahub.utilities.urns.data_flow_urn import DataFlowUrn +from datahub.utilities.urns.data_job_urn import DataJobUrn +from datahub.utilities.urns.dataset_urn import DatasetUrn +from prefect import get_run_logger +from prefect.blocks.core import Block +from prefect.client import cloud, orchestration +from prefect.client.schemas import FlowRun, TaskRun, Workspace +from prefect.client.schemas.objects import Flow +from prefect.context import FlowRunContext, TaskRunContext +from prefect.settings import PREFECT_API_URL +from pydantic import Field + +from prefect_datahub.entities import _Entity + +ORCHESTRATOR = "prefect" + +# Flow and task common constants +VERSION = "version" +RETRIES = "retries" +TIMEOUT_SECONDS = "timeout_seconds" +LOG_PRINTS = "log_prints" +ON_COMPLETION = "on_completion" +ON_FAILURE = "on_failure" + +# Flow constants +FLOW_RUN_NAME = "flow_run_name" +TASK_RUNNER = "task_runner" +PERSIST_RESULT = "persist_result" +ON_CANCELLATION = "on_cancellation" +ON_CRASHED = "on_crashed" + +# Task constants +CACHE_EXPIRATION = "cache_expiration" +TASK_RUN_NAME = "task_run_name" +REFRESH_CACHE = "refresh_cache" +TASK_KEY = "task_key" + +# Flow run and task run common constants +ID = "id" +CREATED = "created" +UPDATED = "updated" +TAGS = "tags" +ESTIMATED_RUN_TIME = "estimated_run_time" +START_TIME = "start_time" +END_TIME = "end_time" +TOTAL_RUN_TIME = "total_run_time" +NEXT_SCHEDULED_START_TIME = "next_scheduled_start_time" + +# Fask run constants +CREATED_BY = "created_by" +AUTO_SCHEDULED = "auto_scheduled" + +# Task run constants +FLOW_RUN_ID = "flow_run_id" +RUN_COUNT = "run_count" +UPSTREAM_DEPENDENCIES = "upstream_dependencies" + +# States constants +COMPLETE = "Completed" +FAILED = "Failed" +CANCELLED = "Cancelled" + + +class DatahubEmitter(Block): + """ + Block used to emit prefect task and flow related metadata to Datahub REST + + Attributes: + datahub_rest_url Optional(str) : Datahub GMS Rest URL. \ + Example: http://localhost:8080. + env Optional(str) : The environment that all assets produced by this \ + orchestrator belong to. For more detail and possible values refer \ + https://datahubproject.io/docs/graphql/enums/#fabrictype. + platform_instance Optional(str) : The instance of the platform that all assets \ + produced by this recipe belong to. For more detail please refer to \ + https://datahubproject.io/docs/platform-instances/. + + Example: + Store value: + ```python + from prefect_datahub.datahub_emitter import DatahubEmitter + DatahubEmitter( + datahub_rest_url="http://localhost:8080", + env="PROD", + platform_instance="local_prefect" + ).save("BLOCK_NAME") + ``` + Load a stored value: + ```python + from prefect_datahub.datahub_emitter import DatahubEmitter + block = DatahubEmitter.load("BLOCK_NAME") + ``` + """ + + _block_type_name: Optional[str] = "datahub emitter" + + datahub_rest_url: str = Field( + default="http://localhost:8080", + title="Datahub rest url", + description="Datahub GMS Rest URL. Example: http://localhost:8080.", + ) + + env: str = Field( + default="prod", + title="Environment", + description="The environment that all assets produced by this orchestrator " + "belong to. For more detail and possible values refer " + "https://datahubproject.io/docs/graphql/enums/#fabrictype.", + ) + + platform_instance: Optional[str] = Field( + default=None, + title="Platform instance", + description="The instance of the platform that all assets produced by this " + "recipe belong to. For more detail please refer to " + "https://datahubproject.io/docs/platform-instances/.", + ) + + def __init__(self, *args, **kwargs): + """ + Initialize datahub rest emitter + """ + super().__init__(*args, **kwargs) + self.datajobs_to_emit = {} + self.emitter = DatahubRestEmitter(gms_server=self.datahub_rest_url) + self.emitter.test_connection() + + def _entities_to_urn_list(self, iolets: List[_Entity]) -> List[DatasetUrn]: + """ + Convert list of _entity to list of dataser urn + + Args: + iolets (list[_Entity]): The list of entities. + + Returns: + The list of Dataset URN. + """ + return [DatasetUrn.create_from_string(let.urn) for let in iolets] + + def _get_workspace(self) -> Optional[str]: + """ + Fetch workspace name if present in configured prefect api url. + + Returns: + The workspace name. + """ + try: + asyncio.run(cloud.get_cloud_client().api_healthcheck()) + except Exception: + get_run_logger().debug(traceback.format_exc()) + return None + if "workspaces" not in PREFECT_API_URL.value(): + get_run_logger().debug( + "Cannot fetch workspace name. Please login to prefect cloud using " + "command 'prefect cloud login'." + ) + return None + current_workspace_id = PREFECT_API_URL.value().split("/")[-1] + workspaces: List[Workspace] = asyncio.run( + cloud.get_cloud_client().read_workspaces() + ) + for workspace in workspaces: + if str(workspace.workspace_id) == current_workspace_id: + return workspace.workspace_name + return None + + async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]: + """ + Fetch the flow run graph for provided flow run id + + Args: + flow_run_id (str): The flow run id. + + Returns: + The flow run graph in json format. + """ + try: + response = await orchestration.get_client()._client.get( + f"/flow_runs/{flow_run_id}/graph" + ) + except Exception: + get_run_logger().debug(traceback.format_exc()) + return None + return response.json() + + def _emit_browsepath(self, urn: str, workspace_name: str) -> None: + """ + Emit browsepath for provided urn. Set path as orchestrator/env/workspace_name. + + Args: + urn (str): The entity URN + workspace_name (str): The prefect cloud workspace name + """ + mcp = MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsClass( + paths=[f"/{ORCHESTRATOR}/{self.env}/{workspace_name}"] + ), + ) + self.emitter.emit(mcp) + + def _generate_datajob( + self, + flow_run_ctx: FlowRunContext, + task_run_ctx: Optional[TaskRunContext] = None, + task_key: Optional[str] = None, + ) -> Optional[DataJob]: + """ + Create datajob entity using task run ctx and flow run ctx. + Assign description, tags, and properties to created datajob. + + Args: + flow_run_ctx (FlowRunContext): The prefect current running flow run context. + task_run_ctx (Optional[TaskRunContext]): The prefect current running task \ + run context. + task_key (Optional[str]): The task key. + + Returns: + The datajob entity. + """ + dataflow_urn = DataFlowUrn.create_from_ids( + orchestrator=ORCHESTRATOR, + flow_id=flow_run_ctx.flow.name, + env=self.env, + platform_instance=self.platform_instance, + ) + if task_run_ctx is not None: + datajob = DataJob( + id=task_run_ctx.task.task_key, + flow_urn=dataflow_urn, + name=task_run_ctx.task.name, + ) + + datajob.description = task_run_ctx.task.description + datajob.tags = task_run_ctx.task.tags + job_property_bag: Dict[str, str] = {} + + allowed_task_keys = [ + VERSION, + CACHE_EXPIRATION, + TASK_RUN_NAME, + RETRIES, + TIMEOUT_SECONDS, + LOG_PRINTS, + REFRESH_CACHE, + TASK_KEY, + ON_COMPLETION, + ON_FAILURE, + ] + for key in allowed_task_keys: + if ( + hasattr(task_run_ctx.task, key) + and getattr(task_run_ctx.task, key) is not None + ): + job_property_bag[key] = repr(getattr(task_run_ctx.task, key)) + datajob.properties = job_property_bag + return datajob + elif task_key is not None: + datajob = DataJob( + id=task_key, flow_urn=dataflow_urn, name=task_key.split(".")[-1] + ) + return datajob + return None + + def _generate_dataflow(self, flow_run_ctx: FlowRunContext) -> Optional[DataFlow]: + """ + Create dataflow entity using flow run ctx. + Assign description, tags, and properties to created dataflow. + + Args: + flow_run_ctx (FlowRunContext): The prefect current running flow run context. + + Returns: + The dataflow entity. + """ + try: + flow: Flow = asyncio.run( + orchestration.get_client().read_flow( + flow_id=flow_run_ctx.flow_run.flow_id + ) + ) + except Exception: + get_run_logger().debug(traceback.format_exc()) + return None + assert flow + + dataflow = DataFlow( + orchestrator=ORCHESTRATOR, + id=flow_run_ctx.flow.name, + env=self.env, + name=flow_run_ctx.flow.name, + platform_instance=self.platform_instance, + ) + dataflow.description = flow_run_ctx.flow.description + dataflow.tags = set(flow.tags) + flow_property_bag: Dict[str, str] = {} + flow_property_bag[ID] = str(flow.id) + flow_property_bag[CREATED] = str(flow.created) + flow_property_bag[UPDATED] = str(flow.updated) + + allowed_flow_keys = [ + VERSION, + FLOW_RUN_NAME, + RETRIES, + TASK_RUNNER, + TIMEOUT_SECONDS, + PERSIST_RESULT, + LOG_PRINTS, + ON_COMPLETION, + ON_FAILURE, + ON_CANCELLATION, + ON_CRASHED, + ] + for key in allowed_flow_keys: + if ( + hasattr(flow_run_ctx.flow, key) + and getattr(flow_run_ctx.flow, key) is not None + ): + flow_property_bag[key] = repr(getattr(flow_run_ctx.flow, key)) + dataflow.properties = flow_property_bag + + return dataflow + + def _emit_tasks( + self, + flow_run_ctx: FlowRunContext, + dataflow: DataFlow, + workspace_name: Optional[str] = None, + ) -> None: + """ + Emit prefect tasks metadata to datahub rest. Add upstream dependencies if + present for each task. + + Args: + flow_run_ctx (FlowRunContext): The prefect current running flow run context + dataflow (DataFlow): The datahub dataflow entity. + workspace_name Optional(str): The prefect cloud workpace name. + """ + graph_json = asyncio.run( + self._get_flow_run_graph(str(flow_run_ctx.flow_run.id)) + ) + if graph_json is None: + return + + task_run_key_map: Dict[str, str] = {} + for prefect_future in flow_run_ctx.task_run_futures: + if prefect_future.task_run is not None: + task_run_key_map[ + str(prefect_future.task_run.id) + ] = prefect_future.task_run.task_key + + get_run_logger().info("Emitting tasks to datahub...") + + for node in graph_json: + datajob_urn = DataJobUrn.create_from_ids( + data_flow_urn=str(dataflow.urn), + job_id=task_run_key_map[node[ID]], + ) + datajob: Optional[DataJob] = None + if str(datajob_urn) in self.datajobs_to_emit: + datajob = self.datajobs_to_emit[str(datajob_urn)] + else: + datajob = self._generate_datajob( + flow_run_ctx=flow_run_ctx, task_key=task_run_key_map[node[ID]] + ) + if datajob is not None: + for each in node[UPSTREAM_DEPENDENCIES]: + upstream_task_urn = DataJobUrn.create_from_ids( + data_flow_urn=str(dataflow.urn), + job_id=task_run_key_map[each[ID]], + ) + datajob.upstream_urns.extend([upstream_task_urn]) + datajob.emit(self.emitter) + + if workspace_name is not None: + self._emit_browsepath(str(datajob.urn), workspace_name) + + self._emit_task_run( + datajob=datajob, + flow_run_name=flow_run_ctx.flow_run.name, + task_run_id=UUID(node[ID]), + ) + + def _emit_flow_run(self, dataflow: DataFlow, flow_run_id: UUID) -> None: + """ + Emit prefect flow run to datahub rest. Prefect flow run get mapped with datahub + data process instance entity which get's generate from provided dataflow entity. + Assign flow run properties to data process instance properties. + + Args: + dataflow (DataFlow): The datahub dataflow entity used to create \ + data process instance. + flow_run_id (UUID): The prefect current running flow run id. + """ + try: + flow_run: FlowRun = asyncio.run( + orchestration.get_client().read_flow_run(flow_run_id=flow_run_id) + ) + except Exception: + get_run_logger().debug(traceback.format_exc()) + return + assert flow_run + + if self.platform_instance is not None: + dpi_id = f"{self.platform_instance}.{flow_run.name}" + else: + dpi_id = flow_run.name + dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dpi_id) + + dpi_property_bag: Dict[str, str] = {} + allowed_flow_run_keys = [ + ID, + CREATED, + UPDATED, + CREATED_BY, + AUTO_SCHEDULED, + ESTIMATED_RUN_TIME, + START_TIME, + TOTAL_RUN_TIME, + NEXT_SCHEDULED_START_TIME, + TAGS, + RUN_COUNT, + ] + for key in allowed_flow_run_keys: + if hasattr(flow_run, key) and getattr(flow_run, key) is not None: + dpi_property_bag[key] = str(getattr(flow_run, key)) + dpi.properties.update(dpi_property_bag) + + if flow_run.start_time is not None: + dpi.emit_process_start( + emitter=self.emitter, + start_timestamp_millis=int(flow_run.start_time.timestamp() * 1000), + ) + + def _emit_task_run( + self, datajob: DataJob, flow_run_name: str, task_run_id: UUID + ) -> None: + """ + Emit prefect task run to datahub rest. Prefect task run get mapped with datahub + data process instance entity which get's generate from provided datajob entity. + Assign task run properties to data process instance properties. + + Args: + datajob (DataJob): The datahub datajob entity used to create \ + data process instance. + flow_run_name (str): The prefect current running flow run name. + task_run_id (str): The prefect task run id. + """ + try: + task_run: TaskRun = asyncio.run( + orchestration.get_client().read_task_run(task_run_id=task_run_id) + ) + except Exception: + get_run_logger().debug(traceback.format_exc()) + return + assert task_run + + if self.platform_instance is not None: + dpi_id = f"{self.platform_instance}.{flow_run_name}.{task_run.name}" + else: + dpi_id = f"{flow_run_name}.{task_run.name}" + dpi = DataProcessInstance.from_datajob( + datajob=datajob, + id=dpi_id, + clone_inlets=True, + clone_outlets=True, + ) + + dpi_property_bag: Dict[str, str] = {} + allowed_task_run_keys = [ + ID, + FLOW_RUN_ID, + CREATED, + UPDATED, + ESTIMATED_RUN_TIME, + START_TIME, + END_TIME, + TOTAL_RUN_TIME, + NEXT_SCHEDULED_START_TIME, + TAGS, + RUN_COUNT, + ] + for key in allowed_task_run_keys: + if hasattr(task_run, key) and getattr(task_run, key) is not None: + dpi_property_bag[key] = str(getattr(task_run, key)) + dpi.properties.update(dpi_property_bag) + + state_result_map: Dict[str, InstanceRunResult] = { + COMPLETE: InstanceRunResult.SUCCESS, + FAILED: InstanceRunResult.FAILURE, + CANCELLED: InstanceRunResult.SKIPPED, + } + + if task_run.state_name not in state_result_map: + raise Exception( + f"State should be either complete, failed or cancelled and it was " + f"{task_run.state_name}" + ) + + result = state_result_map[task_run.state_name] + + if task_run.start_time is not None: + dpi.emit_process_start( + emitter=self.emitter, + start_timestamp_millis=int(task_run.start_time.timestamp() * 1000), + emit_template=False, + ) + + if task_run.end_time is not None: + dpi.emit_process_end( + emitter=self.emitter, + end_timestamp_millis=int(task_run.end_time.timestamp() * 1000), + result=result, + result_type=ORCHESTRATOR, + ) + + def add_task( + self, + inputs: Optional[List[_Entity]] = None, + outputs: Optional[List[_Entity]] = None, + ) -> None: + """ + Store prefect current running task metadata temporarily which later get emit + to datahub rest only if user calls emit_flow. Prefect task gets mapped with + datahub datajob entity. Assign provided inputs and outputs as datajob inlets + and outlets respectively. + + Args: + inputs (Optional[list]): The list of task inputs. + outputs (Optional[list]): The list of task outputs. + + Example: + Emit the task metadata as show below: + ```python + from prefect import flow, task + from prefect_datahub.dataset import Dataset + from prefect_datahub.datahub_emitter import DatahubEmitter + + datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME") + + @task(name="Transform", description="Transform the data") + def transform(data): + data = data.split(" ") + datahub_emitter.add_task( + inputs=[Dataset("snowflake", "mydb.schema.tableA")], + outputs=[Dataset("snowflake", "mydb.schema.tableC")], + ) + return data + + @flow(name="ETL flow", description="Extract transform load flow") + def etl(): + data = transform("This is data") + datahub_emitter.emit_flow() + ``` + """ + flow_run_ctx = FlowRunContext.get() + task_run_ctx = TaskRunContext.get() + assert flow_run_ctx + assert task_run_ctx + + datajob = self._generate_datajob( + flow_run_ctx=flow_run_ctx, task_run_ctx=task_run_ctx + ) + if datajob is not None: + if inputs is not None: + datajob.inlets.extend(self._entities_to_urn_list(inputs)) + if outputs is not None: + datajob.outlets.extend(self._entities_to_urn_list(outputs)) + self.datajobs_to_emit[str(datajob.urn)] = datajob + + def emit_flow(self) -> None: + """ + Emit prefect current running flow metadata to datahub rest. Prefect flow gets + mapped with datahub dataflow entity. If the user hasn't called add_task in + the task function still emit_flow will emit a task but without task name, + description,tags and properties. + + + Example: + Emit the flow metadata as show below: + ```python + from prefect import flow, task + from prefect_datahub.datahub_emitter import DatahubEmitter + + datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME") + + @flow(name="ETL flow", description="Extract transform load flow") + def etl(): + data = extract() + data = transform(data) + load(data) + datahub_emitter.emit_flow() + ``` + """ + flow_run_ctx = FlowRunContext.get() + assert flow_run_ctx + + workspace_name = self._get_workspace() + + # Emit flow and flow run + get_run_logger().info("Emitting flow to datahub...") + dataflow = self._generate_dataflow(flow_run_ctx=flow_run_ctx) + + if dataflow is not None: + dataflow.emit(self.emitter) + + if workspace_name is not None: + self._emit_browsepath(str(dataflow.urn), workspace_name) + + self._emit_flow_run(dataflow, flow_run_ctx.flow_run.id) + + self._emit_tasks(flow_run_ctx, dataflow, workspace_name) diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py new file mode 100644 index 0000000000000..e2711d0925d97 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/entities.py @@ -0,0 +1,46 @@ +from abc import abstractmethod +from typing import Optional + +import attr +import datahub.emitter.mce_builder as builder +from datahub.utilities.urns.urn import guess_entity_type + + +class _Entity: + @property + @abstractmethod + def urn(self) -> str: + pass + + +@attr.s(auto_attribs=True, str=True) +class Dataset(_Entity): + platform: str + name: str + env: str = builder.DEFAULT_ENV + platform_instance: Optional[str] = None + + @property + def urn(self): + return builder.make_dataset_urn_with_platform_instance( + platform=self.platform, + name=self.name, + platform_instance=self.platform_instance, + env=self.env, + ) + + +@attr.s(str=True) +class Urn(_Entity): + _urn: str = attr.ib() + + @_urn.validator + def _validate_urn(self, attribute, value): + if not value.startswith("urn:"): + raise ValueError("invalid urn provided: urns must start with 'urn:'") + if guess_entity_type(value) != "dataset": + raise ValueError("Datajob input/output currently only supports datasets") + + @property + def urn(self): + return self._urn diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/__init__.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py new file mode 100644 index 0000000000000..d7ea7104f25ed --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/flow.py @@ -0,0 +1,32 @@ +from prefect import flow, task + +from prefect_datahub.datahub_emitter import DatahubEmitter +from prefect_datahub.entities import Dataset + +datahub_emitter = DatahubEmitter.load("datahub-block") + + +@task(name="Extract", description="Extract the data") +def extract(): + data = "This is data" + return data + + +@task(name="Transform", description="Transform the data") +def transform(data): + data = data.split(" ") + datahub_emitter.add_task( + inputs=[Dataset("snowflake", "mydb.schema.tableA")], + outputs=[Dataset("snowflake", "mydb.schema.tableC")], + ) + return data + + +@flow(name="ETL", description="Extract transform load flow") +def etl(): + data = extract() + data = transform(data) + datahub_emitter.emit_flow() + + +etl() diff --git a/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py new file mode 100644 index 0000000000000..52140cf9842e2 --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/src/prefect_datahub/example/save_block.py @@ -0,0 +1,7 @@ +from prefect_datahub.datahub_emitter import DatahubEmitter + +DatahubEmitter( + datahub_rest_url="http://localhost:8080", + env="PROD", + platform_instance="local_prefect", +).save("datahub-block", overwrite=True) diff --git a/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py b/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py new file mode 100644 index 0000000000000..10cf3ad0a608a --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/tests/integration/integration_test_dummy.py @@ -0,0 +1,2 @@ +def test_dummy(): + pass diff --git a/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py new file mode 100644 index 0000000000000..76794bc0fb27a --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_block_standards.py @@ -0,0 +1,45 @@ +import re +from typing import Type + +import pytest +from prefect.blocks.core import Block + +from prefect_datahub.datahub_emitter import DatahubEmitter + + +@pytest.mark.parametrize("block", [DatahubEmitter]) +class TestAllBlocksAdhereToStandards: + @pytest.fixture + def block(self, block): + return block + + def test_has_a_description(self, block: Type[Block]) -> None: + assert block.get_description() + + def test_all_fields_have_a_description(self, block: Type[Block]) -> None: + for name, field in block.__fields__.items(): + if Block.is_block_class(field.type_): + # TODO: Block field descriptions aren't currently handled by the UI, so block + # fields are currently excluded from this test. Once block field descriptions are + # supported by the UI, remove this clause. + continue + assert ( + field.field_info.description + ), f"{block.__name__} is missing a description on {name}" + assert field.field_info.description.endswith( + "." + ), f"{name} description on {block.__name__} does not end with a period" + + def test_has_a_valid_code_example(self, block: Type[Block]) -> None: + code_example = block.get_code_example() + assert code_example is not None, f"{block.__name__} is missing a code example" + import_pattern = rf"from .* import {block.__name__}" + assert re.search(import_pattern, code_example) is not None, ( + f"The code example for {block.__name__} is missing an import statement" + f" matching the pattern {import_pattern}" + ) + block_load_pattern = rf'.* = {block.__name__}\.load\("BLOCK_NAME"\)' + assert re.search(block_load_pattern, code_example), ( + f"The code example for {block.__name__} is missing a .load statement" + f" matching the pattern {block_load_pattern}" + ) diff --git a/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py new file mode 100644 index 0000000000000..52bdd10485c3c --- /dev/null +++ b/metadata-ingestion-modules/prefect-plugin/tests/unit/test_datahub_emitter.py @@ -0,0 +1,784 @@ +import asyncio +import json +import logging +from typing import Dict, List, Optional, cast +from unittest.mock import MagicMock, Mock, patch +from uuid import UUID + +import pytest +from datahub.api.entities.datajob import DataJob +from datahub.utilities.urns.dataset_urn import DatasetUrn +from prefect.client.schemas import FlowRun, TaskRun, Workspace +from prefect.futures import PrefectFuture +from prefect.server.schemas.core import Flow +from prefect.task_runners import SequentialTaskRunner +from requests.models import Response + +from prefect_datahub.datahub_emitter import DatahubEmitter +from prefect_datahub.entities import Dataset, _Entity + +mock_transform_task_json: Dict = { + "name": "transform", + "description": "Transform the actual data", + "task_key": "__main__.transform", + "tags": ["etl flow task"], +} +mock_extract_task_run_json: Dict = { + "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", + "created": "2023-06-06T05:51:54.822707+00:00", + "updated": "2023-06-06T05:51:55.126000+00:00", + "name": "Extract-0", + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_key": "__main__.extract", + "dynamic_key": "0", + "cache_key": None, + "cache_expiration": None, + "task_version": None, + "empirical_policy": { + "max_retries": 0, + "retry_delay_seconds": 0.0, + "retries": 0, + "retry_delay": 0, + "retry_jitter_factor": None, + }, + "tags": [], + "state_id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", + "task_inputs": {}, + "state_type": "COMPLETED", + "state_name": "Completed", + "run_count": 1, + "flow_run_run_count": 1, + "expected_start_time": "2023-06-06T05:51:54.822183+00:00", + "next_scheduled_start_time": None, + "start_time": "2023-06-06T05:51:55.016264+00:00", + "end_time": "2023-06-06T05:51:55.096534+00:00", + "total_run_time": 0.08027, + "estimated_run_time": 0.08027, + "estimated_start_time_delta": 0.194081, + "state": { + "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.096534+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": False, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, +} +mock_transform_task_run_json: Dict = { + "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", + "created": "2023-06-06T05:51:55.160372+00:00", + "updated": "2023-06-06T05:51:55.358000+00:00", + "name": "transform-0", + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_key": "__main__.transform", + "dynamic_key": "0", + "cache_key": None, + "cache_expiration": None, + "task_version": None, + "empirical_policy": { + "max_retries": 0, + "retry_delay_seconds": 0.0, + "retries": 0, + "retry_delay": 0, + "retry_jitter_factor": None, + }, + "tags": [], + "state_id": "971ad82e-6e5f-4691-abab-c900358e96c2", + "task_inputs": { + "actual_data": [ + {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"} + ] + }, + "state_type": "COMPLETED", + "state_name": "Completed", + "run_count": 1, + "flow_run_run_count": 1, + "expected_start_time": "2023-06-06T05:51:55.159416+00:00", + "next_scheduled_start_time": None, + "start_time": "2023-06-06T05:51:55.243159+00:00", + "end_time": "2023-06-06T05:51:55.332950+00:00", + "total_run_time": 0.089791, + "estimated_run_time": 0.089791, + "estimated_start_time_delta": 0.083743, + "state": { + "id": "971ad82e-6e5f-4691-abab-c900358e96c2", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.332950+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": False, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, +} +mock_load_task_run_json: Dict = { + "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", + "created": "2023-06-06T05:51:55.389823+00:00", + "updated": "2023-06-06T05:51:55.566000+00:00", + "name": "Load_task-0", + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_key": "__main__.load", + "dynamic_key": "0", + "cache_key": None, + "cache_expiration": None, + "task_version": None, + "empirical_policy": { + "max_retries": 0, + "retry_delay_seconds": 0.0, + "retries": 0, + "retry_delay": 0, + "retry_jitter_factor": None, + }, + "tags": [], + "state_id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", + "task_inputs": { + "data": [ + {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"} + ] + }, + "state_type": "COMPLETED", + "state_name": "Completed", + "run_count": 1, + "flow_run_run_count": 1, + "expected_start_time": "2023-06-06T05:51:55.389075+00:00", + "next_scheduled_start_time": None, + "start_time": "2023-06-06T05:51:55.461812+00:00", + "end_time": "2023-06-06T05:51:55.535954+00:00", + "total_run_time": 0.074142, + "estimated_run_time": 0.074142, + "estimated_start_time_delta": 0.072737, + "state": { + "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.535954+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": True, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, +} +mock_flow_json: Dict = { + "id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b", + "created": "2023-06-02T12:31:10.988697+00:00", + "updated": "2023-06-02T12:31:10.988710+00:00", + "name": "etl", + "description": "Extract transform load flow", + "tags": [], +} +mock_flow_run_json: Dict = { + "id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "created": "2023-06-06T05:51:54.544266+00:00", + "updated": "2023-06-06T05:51:55.622000+00:00", + "name": "olivine-beagle", + "flow_id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b", + "state_id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e", + "deployment_id": None, + "work_queue_name": None, + "flow_version": "3ba54dfa31a7c9af4161aa4cd020a527", + "parameters": {}, + "idempotency_key": None, + "context": {}, + "empirical_policy": { + "max_retries": 0, + "retry_delay_seconds": 0.0, + "retries": 0, + "retry_delay": 0, + "pause_keys": [], + "resuming": False, + }, + "tags": [], + "parent_task_run_id": None, + "state_type": "COMPLETED", + "state_name": "Completed", + "run_count": 1, + "expected_start_time": "2023-06-06T05:51:54.543357+00:00", + "next_scheduled_start_time": None, + "start_time": "2023-06-06T05:51:54.750523+00:00", + "end_time": "2023-06-06T05:51:55.596446+00:00", + "total_run_time": 0.845923, + "estimated_run_time": 0.845923, + "estimated_start_time_delta": 0.207166, + "auto_scheduled": False, + "infrastructure_document_id": None, + "infrastructure_pid": None, + "created_by": None, + "work_pool_name": None, + "state": { + "id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.596446+00:00", + "message": "All states completed.", + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": None, + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": False, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, +} +mock_graph_json: List[Dict] = [ + { + "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", + "name": "Extract-0", + "upstream_dependencies": [], + "state": { + "id": "e280decd-2cc8-4428-a70f-149bcaf95b3c", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.096534+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": False, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, + "expected_start_time": "2023-06-06T05:51:54.822183+00:00", + "start_time": "2023-06-06T05:51:55.016264+00:00", + "end_time": "2023-06-06T05:51:55.096534+00:00", + "total_run_time": 0.08027, + "estimated_run_time": 0.08027, + "untrackable_result": False, + }, + { + "id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", + "name": "Load_task-0", + "upstream_dependencies": [ + {"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"} + ], + "state": { + "id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.535954+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": True, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, + "expected_start_time": "2023-06-06T05:51:55.389075+00:00", + "start_time": "2023-06-06T05:51:55.461812+00:00", + "end_time": "2023-06-06T05:51:55.535954+00:00", + "total_run_time": 0.074142, + "estimated_run_time": 0.074142, + "untrackable_result": True, + }, + { + "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", + "name": "transform-0", + "upstream_dependencies": [ + {"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"} + ], + "state": { + "id": "971ad82e-6e5f-4691-abab-c900358e96c2", + "type": "COMPLETED", + "name": "Completed", + "timestamp": "2023-06-06T05:51:55.332950+00:00", + "message": None, + "data": {"type": "unpersisted"}, + "state_details": { + "flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e", + "task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7", + "child_flow_run_id": None, + "scheduled_time": None, + "cache_key": None, + "cache_expiration": None, + "untrackable_result": False, + "pause_timeout": None, + "pause_reschedule": False, + "pause_key": None, + "refresh_cache": None, + }, + }, + "expected_start_time": "2023-06-06T05:51:55.159416+00:00", + "start_time": "2023-06-06T05:51:55.243159+00:00", + "end_time": "2023-06-06T05:51:55.332950+00:00", + "total_run_time": 0.089791, + "estimated_run_time": 0.089791, + "untrackable_result": False, + }, +] +mock_workspace_json: Dict = { + "account_id": "33e98cfe-ad06-4ceb-a500-c11148499f75", + "account_name": "shubhamjagtapgslabcom", + "account_handle": "shubhamjagtapgslabcom", + "workspace_id": "157eb822-1b3b-4338-ae80-98edd5d00cb9", + "workspace_name": "datahub", + "workspace_description": "", + "workspace_handle": "datahub", +} + + +async def mock_task_run_future(): + extract_prefect_future: PrefectFuture = PrefectFuture( + name=mock_extract_task_run_json["name"], + key=UUID("4552629a-ac04-4590-b286-27642292739f"), + task_runner=SequentialTaskRunner(), + ) + extract_prefect_future.task_run = cast( + None, TaskRun.parse_obj(mock_extract_task_run_json) + ) + transform_prefect_future: PrefectFuture = PrefectFuture( + name=mock_transform_task_run_json["name"], + key=UUID("40fff3e5-5ef4-4b8b-9cc8-786f91bcc656"), + task_runner=SequentialTaskRunner(), + ) + transform_prefect_future.task_run = cast( + None, TaskRun.parse_obj(mock_transform_task_run_json) + ) + load_prefect_future: PrefectFuture = PrefectFuture( + name=mock_load_task_run_json["name"], + key=UUID("7565f596-9eb0-4330-ba34-963e7839883e"), + task_runner=SequentialTaskRunner(), + ) + load_prefect_future.task_run = cast( + None, TaskRun.parse_obj(mock_load_task_run_json) + ) + return [extract_prefect_future, transform_prefect_future, load_prefect_future] + + +@pytest.fixture(scope="module") +def mock_run_logger(): + with patch( + "prefect_datahub.datahub_emitter.get_run_logger", + return_value=logging.getLogger(), + ) as mock_logger: + yield mock_logger + + +@pytest.fixture(scope="module") +def mock_run_context(mock_run_logger): + task_run_ctx = MagicMock() + task_run_ctx.task.task_key = mock_transform_task_json["task_key"] + task_run_ctx.task.name = mock_transform_task_json["name"] + task_run_ctx.task.description = mock_transform_task_json["description"] + task_run_ctx.task.tags = mock_transform_task_json["tags"] + + flow_run_ctx = MagicMock() + flow_run_ctx.flow.name = mock_flow_json["name"] + flow_run_ctx.flow.description = mock_flow_json["description"] + flow_run_obj = FlowRun.parse_obj(mock_flow_run_json) + flow_run_ctx.flow_run.id = flow_run_obj.id + flow_run_ctx.flow_run.name = flow_run_obj.name + flow_run_ctx.flow_run.flow_id = flow_run_obj.flow_id + flow_run_ctx.flow_run.start_time = flow_run_obj.start_time + flow_run_ctx.task_run_futures = asyncio.run(mock_task_run_future()) + + with patch( + "prefect_datahub.datahub_emitter.TaskRunContext" + ) as mock_task_run_ctx, patch( + "prefect_datahub.datahub_emitter.FlowRunContext" + ) as mock_flow_run_ctx: + mock_task_run_ctx.get.return_value = task_run_ctx + mock_flow_run_ctx.get.return_value = flow_run_ctx + yield (task_run_ctx, flow_run_ctx) + + +async def mock_task_run(*args, **kwargs): + task_run_id = str(kwargs["task_run_id"]) + if task_run_id == "fa14a52b-d271-4c41-99cb-6b42ca7c070b": + return TaskRun.parse_obj(mock_extract_task_run_json) + elif task_run_id == "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7": + return TaskRun.parse_obj(mock_transform_task_run_json) + elif task_run_id == "f19f83ea-316f-4781-8cbe-1d5d8719afc3": + return TaskRun.parse_obj(mock_load_task_run_json) + return None + + +async def mock_flow(*args, **kwargs): + return Flow.parse_obj(mock_flow_json) + + +async def mock_flow_run(*args, **kwargs): + return FlowRun.parse_obj(mock_flow_run_json) + + +async def mock_flow_run_graph(*args, **kwargs): + response = Response() + response.status_code = 200 + response._content = json.dumps(mock_graph_json, separators=(",", ":")).encode( + "utf-8" + ) + return response + + +async def mock_api_healthcheck(*args, **kwargs): + return None + + +async def mock_read_workspaces(*args, **kwargs): + return [Workspace.parse_obj(mock_workspace_json)] + + +@pytest.fixture(scope="module") +def mock_prefect_client(): + prefect_client_mock = MagicMock() + prefect_client_mock.read_flow.side_effect = mock_flow + prefect_client_mock.read_flow_run.side_effect = mock_flow_run + prefect_client_mock.read_task_run.side_effect = mock_task_run + prefect_client_mock._client.get.side_effect = mock_flow_run_graph + with patch("prefect_datahub.datahub_emitter.orchestration") as mock_client: + mock_client.get_client.return_value = prefect_client_mock + yield prefect_client_mock + + +@pytest.fixture(scope="module") +def mock_prefect_cloud_client(): + prefect_cloud_client_mock = MagicMock() + prefect_cloud_client_mock.api_healthcheck.side_effect = mock_api_healthcheck + prefect_cloud_client_mock.read_workspaces.side_effect = mock_read_workspaces + with patch("prefect_datahub.datahub_emitter.cloud") as mock_client, patch( + "prefect_datahub.datahub_emitter.PREFECT_API_URL.value", + return_value="https://api.prefect.cloud/api/accounts/33e98cfe-ad06-4ceb-" + "a500-c11148499f75/workspaces/157eb822-1b3b-4338-ae80-98edd5d00cb9", + ): + mock_client.get_cloud_client.return_value = prefect_cloud_client_mock + yield prefect_cloud_client_mock + + +@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) +def test_entities_to_urn_list(mock_emit): + dataset_urn_list = DatahubEmitter()._entities_to_urn_list( + [Dataset("snowflake", "mydb.schema.tableA")] + ) + for dataset_urn in dataset_urn_list: + assert isinstance(dataset_urn, DatasetUrn) + + +@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) +def test_get_flow_run_graph(mock_emit, mock_prefect_client): + graph_json = asyncio.run( + DatahubEmitter()._get_flow_run_graph("c3b947e5-3fa1-4b46-a2e2-58d50c938f2e") + ) + assert isinstance(graph_json, list) + + +@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) +def test__get_workspace(mock_emit, mock_prefect_cloud_client): + workspace_name = DatahubEmitter()._get_workspace() + assert workspace_name == "datahub" + + +@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) +def test_add_task(mock_emit, mock_run_context): + mock_emitter = Mock() + mock_emit.return_value = mock_emitter + + datahub_emitter = DatahubEmitter() + inputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableA")] + outputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableC")] + datahub_emitter.add_task( + inputs=inputs, + outputs=outputs, + ) + + task_run_ctx = mock_run_context[0] + flow_run_ctx = mock_run_context[1] + + expected_datajob_urn = ( + f"urn:li:dataJob:(urn:li:dataFlow:" + f"(prefect,{flow_run_ctx.flow.name},prod),{task_run_ctx.task.task_key})" + ) + + assert expected_datajob_urn in datahub_emitter.datajobs_to_emit.keys() + actual_datajob = datahub_emitter.datajobs_to_emit[expected_datajob_urn] + assert isinstance(actual_datajob, DataJob) + assert str(actual_datajob.flow_urn) == "urn:li:dataFlow:(prefect,etl,prod)" + assert actual_datajob.name == task_run_ctx.task.name + assert actual_datajob.description == task_run_ctx.task.description + assert actual_datajob.tags == task_run_ctx.task.tags + assert ( + str(actual_datajob.inlets[0]) + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)" + ) + assert ( + str(actual_datajob.outlets[0]) + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + ) + assert mock_emit.emit.call_count == 0 + + +@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True) +def test_emit_flow( + mock_emit, mock_run_context, mock_prefect_client, mock_prefect_cloud_client +): + mock_emitter = Mock() + mock_emit.return_value = mock_emitter + + platform_instance = "datahub_workspace" + + datahub_emitter = DatahubEmitter(platform_instance=platform_instance) + datahub_emitter.add_task() + datahub_emitter.emit_flow() + + task_run_ctx = mock_run_context[0] + flow_run_ctx = mock_run_context[1] + + expected_dataflow_urn = ( + f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},prod)" + ) + + assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo" + assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn + assert mock_emitter.method_calls[2][1][0].aspectName == "ownership" + assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn + assert mock_emitter.method_calls[3][1][0].aspectName == "globalTags" + assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn + assert mock_emitter.method_calls[4][1][0].aspectName == "browsePaths" + assert mock_emitter.method_calls[4][1][0].entityUrn == expected_dataflow_urn + assert ( + mock_emitter.method_calls[8][1][0].aspectName == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[8][1][0].entityUrn + == "urn:li:dataProcessInstance:a95d24db6abd98384fc1d4c8540098a4" + ) + assert ( + mock_emitter.method_calls[9][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[9][1][0].entityUrn + == "urn:li:dataProcessInstance:a95d24db6abd98384fc1d4c8540098a4" + ) + assert ( + mock_emitter.method_calls[10][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[10][1][0].entityUrn + == "urn:li:dataProcessInstance:a95d24db6abd98384fc1d4c8540098a4" + ) + assert mock_emitter.method_calls[11][1][0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[11][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" + ) + assert mock_emitter.method_calls[12][1][0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[12][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" + ) + assert mock_emitter.method_calls[13][1][0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[13][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" + ) + assert mock_emitter.method_calls[14][1][0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[14][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" + ) + assert mock_emitter.method_calls[15][1][0].aspectName == "browsePaths" + assert ( + mock_emitter.method_calls[15][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)" + ) + assert ( + mock_emitter.method_calls[16][1][0].aspectName + == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[16][1][0].entityUrn + == "urn:li:dataProcessInstance:bf5eab177af0097bbff6a41694f39af9" + ) + assert ( + mock_emitter.method_calls[17][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[17][1][0].entityUrn + == "urn:li:dataProcessInstance:bf5eab177af0097bbff6a41694f39af9" + ) + assert ( + mock_emitter.method_calls[18][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[18][1][0].entityUrn + == "urn:li:dataProcessInstance:bf5eab177af0097bbff6a41694f39af9" + ) + assert ( + mock_emitter.method_calls[19][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[19][1][0].entityUrn + == "urn:li:dataProcessInstance:bf5eab177af0097bbff6a41694f39af9" + ) + assert mock_emitter.method_calls[20][1][0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[20][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" + ) + assert mock_emitter.method_calls[21][1][0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[21][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" + ) + assert mock_emitter.method_calls[22][1][0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[22][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" + ) + assert mock_emitter.method_calls[23][1][0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[23][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" + ) + assert mock_emitter.method_calls[24][1][0].aspectName == "browsePaths" + assert ( + mock_emitter.method_calls[24][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)" + ) + assert ( + mock_emitter.method_calls[25][1][0].aspectName + == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[25][1][0].entityUrn + == "urn:li:dataProcessInstance:095673536b61e6f25c7691af0d2cc317" + ) + assert ( + mock_emitter.method_calls[26][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[26][1][0].entityUrn + == "urn:li:dataProcessInstance:095673536b61e6f25c7691af0d2cc317" + ) + assert ( + mock_emitter.method_calls[27][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[27][1][0].entityUrn + == "urn:li:dataProcessInstance:095673536b61e6f25c7691af0d2cc317" + ) + assert ( + mock_emitter.method_calls[28][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[28][1][0].entityUrn + == "urn:li:dataProcessInstance:095673536b61e6f25c7691af0d2cc317" + ) + assert mock_emitter.method_calls[29][1][0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[29][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" + ) + assert mock_emitter.method_calls[30][1][0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[30][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" + ) + assert mock_emitter.method_calls[31][1][0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[31][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" + ) + assert mock_emitter.method_calls[32][1][0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[32][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" + ) + assert ( + mock_emitter.method_calls[32][1][0].aspect.tags[0].tag + == f"urn:li:tag:{task_run_ctx.task.tags[0]}" + ) + assert mock_emitter.method_calls[33][1][0].aspectName == "browsePaths" + assert ( + mock_emitter.method_calls[33][1][0].entityUrn + == f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)" + ) + assert ( + mock_emitter.method_calls[34][1][0].aspectName + == "dataProcessInstanceProperties" + ) + assert ( + mock_emitter.method_calls[34][1][0].entityUrn + == "urn:li:dataProcessInstance:04ba0f8064b2c45f69da571c434f1c69" + ) + assert ( + mock_emitter.method_calls[35][1][0].aspectName + == "dataProcessInstanceRelationships" + ) + assert ( + mock_emitter.method_calls[35][1][0].entityUrn + == "urn:li:dataProcessInstance:04ba0f8064b2c45f69da571c434f1c69" + ) + assert ( + mock_emitter.method_calls[36][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[36][1][0].entityUrn + == "urn:li:dataProcessInstance:04ba0f8064b2c45f69da571c434f1c69" + ) + assert ( + mock_emitter.method_calls[37][1][0].aspectName == "dataProcessInstanceRunEvent" + ) + assert ( + mock_emitter.method_calls[37][1][0].entityUrn + == "urn:li:dataProcessInstance:04ba0f8064b2c45f69da571c434f1c69" + ) diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index 47e325171ddcc..0955c0b287231 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -35,7 +35,16 @@ cd metadata-ingestion-modules/airflow-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` +### (Optional) Set up your Python environment for developing on Prefect Plugin +From the repository root: + +```shell +cd metadata-ingestion-modules/prefect-plugin +../../gradlew :metadata-ingestion-modules:prefect-plugin:installDev +source venv/bin/activate +datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" +``` ### Common setup issues Common issues (click to expand): diff --git a/settings.gradle b/settings.gradle index 4614b6ed4ccaf..ea5080c9de1b6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -55,6 +55,7 @@ include 'metadata-integration:java:datahub-client' include 'metadata-integration:java:datahub-protobuf' include 'ingestion-scheduler' include 'metadata-ingestion-modules:airflow-plugin' +include 'metadata-ingestion-modules:prefect-plugin' include 'smoke-test' include 'metadata-auth:auth-api' include 'metadata-service:schema-registry-api'