diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index b0666f4a42aac..3eb34eca85a46 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -91,6 +91,8 @@ jobs: -x :metadata-ingestion-modules:airflow-plugin:check \ -x :metadata-ingestion-modules:dagster-plugin:build \ -x :metadata-ingestion-modules:dagster-plugin:check \ + -x :metadata-ingestion-modules:gx-plugin:build \ + -x :metadata-ingestion-modules:gx-plugin:check \ -x :datahub-frontend:build \ -x :datahub-web-react:build \ --parallel diff --git a/.github/workflows/gx-plugin.yml b/.github/workflows/gx-plugin.yml new file mode 100644 index 0000000000000..84ba2e0559be1 --- /dev/null +++ b/.github/workflows/gx-plugin.yml @@ -0,0 +1,87 @@ +name: GX Plugin +on: + push: + branches: + - master + paths: + - ".github/workflows/gx-plugin.yml" + - "metadata-ingestion-modules/gx-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + pull_request: + branches: + - master + paths: + - ".github/**" + - "metadata-ingestion-modules/gx-plugin/**" + - "metadata-ingestion/**" + - "metadata-models/**" + release: + types: [published] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + gx-plugin: + runs-on: ubuntu-latest + env: + SPARK_VERSION: 3.0.3 + DATAHUB_TELEMETRY_ENABLED: false + strategy: + matrix: + python-version: ["3.8", "3.10"] + include: + - python-version: "3.8" + extraPythonRequirement: "great-expectations~=0.15.12" + - python-version: "3.10" + extraPythonRequirement: "great-expectations~=0.16.0 numpy~=1.26.0" + - python-version: "3.11" + extraPythonRequirement: "great-expectations~=0.17.0" + fail-fast: false + steps: + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + distribution: "zulu" + java-version: 17 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh + - name: Install GX package and test (extras ${{ matrix.extraPythonRequirement }}) + run: ./gradlew -Pextra_pip_requirements='${{ matrix.extraPythonRequirement }}' :metadata-ingestion-modules:gx-plugin:lint :metadata-ingestion-modules:gx-plugin:testQuick + - name: pip freeze show list installed + if: always() + run: source metadata-ingestion-modules/gx-plugin/venv/bin/activate && pip freeze + - uses: actions/upload-artifact@v3 + if: ${{ always() && matrix.python-version == '3.11' && matrix.extraPythonRequirement == 'great-expectations~=0.17.0' }} + with: + name: Test Results (GX Plugin ${{ matrix.python-version}}) + path: | + **/build/reports/tests/test/** + **/build/test-results/test/** + **/junit.*.xml + - name: Upload coverage to Codecov + if: always() + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + directory: . + fail_ci_if_error: false + flags: gx-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }} + name: pytest-gx + verbose: true + + event-file: + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v3 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/.github/workflows/test-results.yml b/.github/workflows/test-results.yml index a122ef3835f4d..947fc35f169a0 100644 --- a/.github/workflows/test-results.yml +++ b/.github/workflows/test-results.yml @@ -2,7 +2,7 @@ name: Test Results on: workflow_run: - workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin"] + workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "GX Plugin"] types: - completed diff --git a/docs-website/build.gradle b/docs-website/build.gradle index 798047a562ffd..803112bf85716 100644 --- a/docs-website/build.gradle +++ b/docs-website/build.gradle @@ -86,6 +86,7 @@ task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall, ':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel', ':metadata-ingestion-modules:dagster-plugin:buildWheel', + ':metadata-ingestion-modules:gx-plugin:buildWheel', ]) { inputs.files(projectMdFiles) outputs.cacheIf { true } diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index 23888d9000161..ceac79bd5cad3 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -573,6 +573,7 @@ function copy_python_wheels(): void { "../metadata-ingestion/dist", "../metadata-ingestion-modules/airflow-plugin/dist", "../metadata-ingestion-modules/dagster-plugin/dist", + "../metadata-ingestion-modules/gx-plugin/dist", ]; const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels"); diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 1f9c0a4d79a9d..835263fb8872f 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -917,6 +917,7 @@ module.exports = { // "metadata-integration/java/openlineage-converter/README" //"metadata-ingestion-modules/airflow-plugin/README" //"metadata-ingestion-modules/dagster-plugin/README" + //"metadata-ingestion-modules/gx-plugin/README" // "metadata-ingestion/schedule_docs/datahub", // we can delete this // TODO: change the titles of these, removing the "What is..." portion from the sidebar" // "docs/what/entity", diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 2401b169cd660..80d2efd3ed164 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -16,7 +16,9 @@ def get_long_description(): _version: str = package_metadata["__version__"] _self_pin = ( - f"=={_version}" if not (_version.endswith("dev0") or "docker" in _version) else "" + f"=={_version}" + if not (_version.endswith(("dev0", "dev1")) or "docker" in _version) + else "" ) diff --git a/metadata-ingestion-modules/dagster-plugin/build.gradle b/metadata-ingestion-modules/dagster-plugin/build.gradle index 6cb7b9295549a..74ca7cedea3a5 100644 --- a/metadata-ingestion-modules/dagster-plugin/build.gradle +++ b/metadata-ingestion-modules/dagster-plugin/build.gradle @@ -33,7 +33,7 @@ task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingesti outputs.file(sentinel_file) commandLine 'bash', '-c', "source ${venv_name}/bin/activate && set -x && " + - "uv pip install -e . ${extra_pip_requirements} && " + + "${pip_install_command} -e . ${extra_pip_requirements} && " + "touch ${sentinel_file}" } @@ -45,15 +45,11 @@ task installDev(type: Exec, dependsOn: [install]) { outputs.file(sentinel_file) commandLine 'bash', '-c', "source ${venv_name}/bin/activate && set -x && " + - "uv pip install -e .[dev] ${extra_pip_requirements} && " + + "${pip_install_command} -e .[dev] ${extra_pip_requirements} && " + "touch ${sentinel_file}" } task lint(type: Exec, dependsOn: installDev) { - /* - The find/sed combo below is a temporary work-around for the following mypy issue with airflow 2.2.0: - "venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax". - */ commandLine 'bash', '-c', "source ${venv_name}/bin/activate && set -x && " + "black --check --diff src/ tests/ examples/ && " + @@ -77,7 +73,7 @@ task installDevTest(type: Exec, dependsOn: [installDev]) { outputs.file(sentinel_file) commandLine 'bash', '-c', "source ${venv_name}/bin/activate && set -x && " + - "uv pip install -e .[dev,integration-tests] ${extra_pip_requirements} && " + + "${pip_install_command} -e .[dev,integration-tests] ${extra_pip_requirements} && " + "touch ${sentinel_file}" } @@ -105,10 +101,6 @@ task testQuick(type: Exec, dependsOn: installDevTest) { } -task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) { - commandLine 'bash', '-x', '-c', - "source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml" -} task buildWheel(type: Exec, dependsOn: [environmentSetup]) { commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " + 'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh' diff --git a/metadata-ingestion-modules/dagster-plugin/setup.py b/metadata-ingestion-modules/dagster-plugin/setup.py index 8a2a1d76d345b..bf9fcf09a66bc 100644 --- a/metadata-ingestion-modules/dagster-plugin/setup.py +++ b/metadata-ingestion-modules/dagster-plugin/setup.py @@ -17,7 +17,9 @@ def get_long_description(): _version: str = package_metadata["__version__"] _self_pin = ( - f"=={_version}" if not (_version.endswith("dev0") or "docker" in _version) else "" + f"=={_version}" + if not (_version.endswith(("dev0", "dev1")) or "docker" in _version) + else "" ) base_requirements = { @@ -25,9 +27,7 @@ def get_long_description(): "dagster >= 1.3.3", "dagit >= 1.3.3", *rest_common, - # Ignoring the dependency below because it causes issues with the vercel built wheel install - # f"acryl-datahub[datahub-rest]{_self_pin}", - "acryl-datahub[datahub-rest]", + f"acryl-datahub[datahub-rest]{_self_pin}", } mypy_stubs = { diff --git a/metadata-ingestion-modules/gx-plugin/.gitignore b/metadata-ingestion-modules/gx-plugin/.gitignore new file mode 100644 index 0000000000000..8c01744589e35 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/.gitignore @@ -0,0 +1,143 @@ +.envrc +src/datahub_gx_plugin/__init__.py.bak +.vscode/ +output +pvenv36/ +bq_credentials.json +/tmp +*.bak + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Generated classes +src/datahub/metadata/ +wheels/ +junit.quick.xml diff --git a/metadata-ingestion-modules/gx-plugin/README.md b/metadata-ingestion-modules/gx-plugin/README.md new file mode 100644 index 0000000000000..1ffd87a955432 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/README.md @@ -0,0 +1,4 @@ +# Datahub GX Plugin + +See the DataHub GX docs for details. + diff --git a/metadata-ingestion-modules/gx-plugin/build.gradle b/metadata-ingestion-modules/gx-plugin/build.gradle new file mode 100644 index 0000000000000..f1adbc6676e5b --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/build.gradle @@ -0,0 +1,123 @@ +plugins { + id 'base' +} + +ext { + python_executable = 'python3' + venv_name = 'venv' +} + +if (!project.hasProperty("extra_pip_requirements")) { + ext.extra_pip_requirements = "" +} + +def pip_install_command = "VIRTUAL_ENV=${venv_name} ${venv_name}/bin/uv pip install -e ../../metadata-ingestion" + +task checkPythonVersion(type: Exec) { + commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 8)' +} + +task environmentSetup(type: Exec, dependsOn: checkPythonVersion) { + def sentinel_file = "${venv_name}/.venv_environment_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "${python_executable} -m venv ${venv_name} && " + + "${venv_name}/bin/python -m pip install --upgrade pip uv wheel 'setuptools>=63.0.0' && " + + "touch ${sentinel_file}" +} + +task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) { + def sentinel_file = "${venv_name}/.build_install_package_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "${pip_install_command} -e . ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +task install(dependsOn: [installPackage]) + +task installDev(type: Exec, dependsOn: [install]) { + def sentinel_file = "${venv_name}/.build_install_dev_sentinel" + inputs.file file('setup.py') + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "${pip_install_command} -e .[dev] ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +task lint(type: Exec, dependsOn: installDev) { + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "black --check --diff src/ tests/ && " + + "isort --check --diff src/ tests/ && " + + "flake8 --count --statistics src/ tests/ && " + + "mypy --show-traceback --show-error-codes src/ tests/" +} +task lintFix(type: Exec, dependsOn: installDev) { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && " + + "black src/ tests/ && " + + "isort src/ tests/ && " + + "flake8 src/ tests/ && " + + "mypy src/ tests/" +} + +task installDevTest(type: Exec, dependsOn: [installDev]) { + def sentinel_file = "${venv_name}/.build_install_dev_test_sentinel" + inputs.file file('setup.py') + outputs.dir("${venv_name}") + outputs.file(sentinel_file) + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && set -x && " + + "${pip_install_command} -e .[dev,integration-tests] ${extra_pip_requirements} && " + + "touch ${sentinel_file}" +} + +def testFile = hasProperty('testFile') ? testFile : 'unknown' +task testSingle(dependsOn: [installDevTest]) { + doLast { + if (testFile != 'unknown') { + exec { + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest ${testFile}" + } + } else { + throw new GradleException("No file provided. Use -PtestFile=") + } + } +} + +task testQuick(type: Exec, dependsOn: installDevTest) { + // We can't enforce the coverage requirements if we run a subset of the tests. + inputs.files(project.fileTree(dir: "src/", include: "**/*.py")) + inputs.files(project.fileTree(dir: "tests/")) + outputs.dir("${venv_name}") + commandLine 'bash', '-x', '-c', + "source ${venv_name}/bin/activate && pytest -vv --continue-on-collection-errors --junit-xml=junit.quick.xml" +} + + +task buildWheel(type: Exec, dependsOn: [environmentSetup]) { + commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " + + 'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh' +} + +task cleanPythonCache(type: Exec) { + commandLine 'bash', '-c', + "find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete" +} + +build.dependsOn install +check.dependsOn lint +check.dependsOn testQuick + +clean { + delete venv_name + delete 'build' + delete 'dist' +} +clean.dependsOn cleanPythonCache diff --git a/metadata-ingestion-modules/gx-plugin/pyproject.toml b/metadata-ingestion-modules/gx-plugin/pyproject.toml new file mode 100644 index 0000000000000..fba81486b9f67 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=54.0.0", "wheel", "pip>=21.0.0"] + +[tool.black] +extend-exclude = ''' +# A regex preceded with ^/ will apply only to files and directories +# in the root of the project. +^/tmp +''' +include = '\.pyi?$' + +[tool.isort] +indent = ' ' +profile = 'black' +sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER' + +[tool.pyright] +extraPaths = ['tests'] \ No newline at end of file diff --git a/metadata-ingestion-modules/gx-plugin/scripts/release.sh b/metadata-ingestion-modules/gx-plugin/scripts/release.sh new file mode 100755 index 0000000000000..058add495821c --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/scripts/release.sh @@ -0,0 +1,26 @@ +#!/bin/bash +set -euxo pipefail + +if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then + ../../gradlew build # also runs tests +elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then + ../../gradlew install +fi + +MODULE=datahub_gx_plugin + +# Check packaging constraint. +python -c 'import setuptools; where="./src"; assert setuptools.find_packages(where) == setuptools.find_namespace_packages(where), "you seem to be missing or have extra __init__.py files"' +if [[ ${RELEASE_VERSION:-} ]]; then + # Replace version with RELEASE_VERSION env variable + sed -i.bak "s/__version__ = \"1\!0.0.0.dev0\"/__version__ = \"$(echo $RELEASE_VERSION|sed s/-/+/)\"/" src/${MODULE}/__init__.py +else + vim src/${MODULE}/__init__.py +fi + +rm -rf build dist || true +python -m build +if [[ ! ${RELEASE_SKIP_UPLOAD:-} ]]; then + python -m twine upload 'dist/*' +fi +mv src/${MODULE}/__init__.py.bak src/${MODULE}/__init__.py diff --git a/metadata-ingestion-modules/gx-plugin/setup.cfg b/metadata-ingestion-modules/gx-plugin/setup.cfg new file mode 100644 index 0000000000000..bbdd85f0fdc4e --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/setup.cfg @@ -0,0 +1,71 @@ +[flake8] +max-complexity = 15 +ignore = + # Ignore: line length issues, since black's formatter will take care of them. + E501, + # Ignore: 1 blank line required before class docstring. + D203, + # See https://stackoverflow.com/a/57074416. + W503, + # See https://github.com/psf/black/issues/315. + E203 +exclude = + .git, + venv, + .tox, + __pycache__ +per-file-ignores = + # imported but unused + __init__.py: F401 +ban-relative-imports = true + +[mypy] +plugins = + pydantic.mypy +exclude = ^(venv|build|dist)/ +ignore_missing_imports = yes +strict_optional = yes +check_untyped_defs = yes +disallow_incomplete_defs = yes +disallow_untyped_decorators = yes +warn_unused_configs = yes +# eventually we'd like to enable these +disallow_untyped_defs = no + +# try to be a bit more strict in certain areas of the codebase +[mypy-datahub.*] +ignore_missing_imports = no +[mypy-tests.*] +ignore_missing_imports = no + +[tool:pytest] +asyncio_mode = auto +addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers +markers = + integration: marks all integration tests, across all batches (deselect with '-m "not integration"') +testpaths = + tests/unit + tests/integration + +[coverage:run] +# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov, +# and tox interact, we should not uncomment the following line. +# See https://pytest-cov.readthedocs.io/en/latest/config.html and +# https://coverage.readthedocs.io/en/coverage-5.0/config.html. +# We also have some additional pytest/cov config options in tox.ini. +# source = src + +[coverage:paths] +# This is necessary for tox-based coverage to be counted properly. +source = + src + */site-packages + +[coverage:report] +# The fail_under value ensures that at least some coverage data is collected. +# We override its value in the tox config. +show_missing = true +exclude_lines = + pragma: no cover + @abstract + if TYPE_CHECKING: diff --git a/metadata-ingestion-modules/gx-plugin/setup.py b/metadata-ingestion-modules/gx-plugin/setup.py new file mode 100644 index 0000000000000..1584111f820f5 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/setup.py @@ -0,0 +1,157 @@ +import os +import pathlib + +import setuptools + +package_metadata: dict = {} +with open("./src/datahub_gx_plugin/__init__.py") as fp: + exec(fp.read(), package_metadata) + + +def get_long_description(): + root = os.path.dirname(__file__) + return pathlib.Path(os.path.join(root, "README.md")).read_text() + + +rest_common = {"requests", "requests_file"} + +# TODO: Can we move away from sqllineage and use sqlglot ?? +sqllineage_lib = { + "sqllineage==1.3.8", + # We don't have a direct dependency on sqlparse but it is a dependency of sqllineage. + # There have previously been issues from not pinning sqlparse, so it's best to pin it. + # Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360 + "sqlparse==0.4.4", +} + +_version: str = package_metadata["__version__"] +_self_pin = ( + f"=={_version}" + if not (_version.endswith(("dev0", "dev1")) or "docker" in _version) + else "" +) + +base_requirements = { + # Actual dependencies. + # This is temporary lower bound that we're open to loosening/tightening as requirements show up + "sqlalchemy>=1.4.39, <2", + # GE added handling for higher version of jinja2 in version 0.15.12 + # https://github.com/great-expectations/great_expectations/pull/5382/files + # TODO: support GX 0.18.0 + "great-expectations>=0.15.12, <0.18.0", + # datahub does not depend on traitlets directly but great expectations does. + # https://github.com/ipython/traitlets/issues/741 + "traitlets<5.2.2", + *rest_common, + *sqllineage_lib, + f"acryl-datahub[datahub-rest]{_self_pin}", +} + +mypy_stubs = { + "types-dataclasses", + "sqlalchemy-stubs", + "types-setuptools", + "types-six", + "types-python-dateutil", + "types-requests", + "types-toml", + "types-PyYAML", + "types-freezegun", + "types-cachetools", + # versions 0.1.13 and 0.1.14 seem to have issues + "types-click==0.1.12", + "types-tabulate", + # avrogen package requires this + "types-pytz", +} + +base_dev_requirements = { + *base_requirements, + *mypy_stubs, + "black==22.12.0", + "coverage>=5.1", + "flake8>=6.0.0", + "flake8-tidy-imports>=4.3.0", + "flake8-bugbear==23.3.12", + "isort>=5.7.0", + "mypy>=1.4.0", + # pydantic 1.8.2 is incompatible with mypy 0.910. + # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. + "pydantic>=1.10.0,!=1.10.3", + "pytest>=6.2.2", + "pytest-asyncio>=0.16.0", + "pytest-cov>=2.8.1", + "tox", + "deepdiff", + "requests-mock", + "freezegun", + "jsonpickle", + "build", + "twine", + "packaging", +} + +dev_requirements = { + *base_dev_requirements, +} + +integration_test_requirements = { + *dev_requirements, + "psycopg2-binary", + "pyspark", + f"acryl-datahub[testing-utils]{_self_pin}", + "pytest-docker>=1.1.0", +} + +entry_points = { + "gx.plugins": "acryl-datahub-gx-plugin = datahub_gx_plugin.action:DataHubValidationAction" +} + + +setuptools.setup( + # Package metadata. + name=package_metadata["__package_name__"], + version=package_metadata["__version__"], + url="https://datahubproject.io/", + project_urls={ + "Documentation": "https://datahubproject.io/docs/", + "Source": "https://github.com/datahub-project/datahub", + "Changelog": "https://github.com/datahub-project/datahub/releases", + }, + license="Apache License 2.0", + description="Datahub GX plugin to capture executions and send to Datahub", + long_description=get_long_description(), + long_description_content_type="text/markdown", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "Intended Audience :: System Administrators", + "License :: OSI Approved", + "License :: OSI Approved :: Apache Software License", + "Operating System :: Unix", + "Operating System :: POSIX :: Linux", + "Environment :: Console", + "Environment :: MacOS X", + "Topic :: Software Development", + ], + # Package info. + zip_safe=False, + python_requires=">=3.8", + package_dir={"": "src"}, + packages=setuptools.find_namespace_packages(where="./src"), + entry_points=entry_points, + # Dependencies. + install_requires=list(base_requirements), + extras_require={ + "ignore": [], # This is a dummy extra to allow for trailing commas in the list. + "dev": list(dev_requirements), + "integration-tests": list(integration_test_requirements), + }, +) diff --git a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/__init__.py b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/__init__.py new file mode 100644 index 0000000000000..a7689be82a5d9 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/__init__.py @@ -0,0 +1,21 @@ +# Published at https://pypi.org/project/acryl-datahub/. +__package_name__ = "acryl-datahub-gx-plugin" +__version__ = "1!0.0.0.dev0" + + +def is_dev_mode() -> bool: + return __version__.endswith("dev0") + + +def nice_version_name() -> str: + if is_dev_mode(): + return "unavailable (installed in develop mode)" + return __version__ + + +def get_provider_info(): + return { + "package-name": f"{__package_name__}", + "name": f"{__package_name__}", + "description": "Datahub metadata collector plugin", + } diff --git a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py new file mode 100644 index 0000000000000..76e43cf8c2c3d --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py @@ -0,0 +1,871 @@ +import json +import logging +import sys +import time +from dataclasses import dataclass +from datetime import timezone +from decimal import Decimal +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +import datahub.emitter.mce_builder as builder +from datahub.cli.env_utils import get_boolean_env_variable +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.emitter.serialization_helper import pre_json_transform +from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( + get_platform_from_sqlalchemy_uri, +) +from datahub.metadata.com.linkedin.pegasus2avro.assertion import ( + AssertionInfo, + AssertionResult, + AssertionResultType, + AssertionRunEvent, + AssertionRunStatus, + AssertionStdAggregation, + AssertionStdOperator, + AssertionStdParameter, + AssertionStdParameters, + AssertionStdParameterType, + AssertionType, + BatchSpec, + DatasetAssertionInfo, + DatasetAssertionScope, +) +from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance +from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass +from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED +from datahub.utilities.sql_parser import DefaultSQLParser +from great_expectations.checkpoint.actions import ValidationAction +from great_expectations.core.batch import Batch +from great_expectations.core.batch_spec import ( + RuntimeDataBatchSpec, + RuntimeQueryBatchSpec, + SqlAlchemyDatasourceBatchSpec, +) +from great_expectations.core.expectation_validation_result import ( + ExpectationSuiteValidationResult, +) +from great_expectations.data_asset.data_asset import DataAsset +from great_expectations.data_context import AbstractDataContext +from great_expectations.data_context.types.resource_identifiers import ( + ExpectationSuiteIdentifier, + ValidationResultIdentifier, +) +from great_expectations.execution_engine import PandasExecutionEngine +from great_expectations.execution_engine.sqlalchemy_execution_engine import ( + SqlAlchemyExecutionEngine, +) +from great_expectations.validator.validator import Validator +from sqlalchemy.engine.base import Connection, Engine +from sqlalchemy.engine.url import make_url + +if TYPE_CHECKING: + from great_expectations.data_context.types.resource_identifiers import ( + GXCloudIdentifier, + ) + +assert MARKUPSAFE_PATCHED +logger = logging.getLogger(__name__) +if get_boolean_env_variable("DATAHUB_DEBUG", False): + handler = logging.StreamHandler(stream=sys.stdout) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + +GE_PLATFORM_NAME = "great-expectations" + + +class DataHubValidationAction(ValidationAction): + def __init__( + self, + data_context: AbstractDataContext, + server_url: str, + env: str = builder.DEFAULT_ENV, + platform_alias: Optional[str] = None, + platform_instance_map: Optional[Dict[str, str]] = None, + graceful_exceptions: bool = True, + token: Optional[str] = None, + timeout_sec: Optional[float] = None, + retry_status_codes: Optional[List[int]] = None, + retry_max_times: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + exclude_dbname: Optional[bool] = None, + parse_table_names_from_sql: bool = False, + convert_urns_to_lowercase: bool = False, + name: str = "DataHubValidationAction", + ): + + super().__init__(data_context) + self.server_url = server_url + self.env = env + self.platform_alias = platform_alias + self.platform_instance_map = platform_instance_map + self.graceful_exceptions = graceful_exceptions + self.token = token + self.timeout_sec = timeout_sec + self.retry_status_codes = retry_status_codes + self.retry_max_times = retry_max_times + self.extra_headers = extra_headers + self.exclude_dbname = exclude_dbname + self.parse_table_names_from_sql = parse_table_names_from_sql + self.convert_urns_to_lowercase = convert_urns_to_lowercase + + def _run( + self, + validation_result_suite: ExpectationSuiteValidationResult, + validation_result_suite_identifier: Union[ + ValidationResultIdentifier, "GXCloudIdentifier" + ], + data_asset: Union[Validator, DataAsset, Batch], + payload: Optional[Any] = None, + expectation_suite_identifier: Optional[ExpectationSuiteIdentifier] = None, + checkpoint_identifier: Optional[Any] = None, + ) -> Dict: + datasets = [] + try: + emitter = DatahubRestEmitter( + gms_server=self.server_url, + token=self.token, + read_timeout_sec=self.timeout_sec, + connect_timeout_sec=self.timeout_sec, + retry_status_codes=self.retry_status_codes, + retry_max_times=self.retry_max_times, + extra_headers=self.extra_headers, + ) + + expectation_suite_name = validation_result_suite.meta.get( + "expectation_suite_name" + ) + run_id = validation_result_suite.meta.get("run_id") + if hasattr(data_asset, "active_batch_id"): + batch_identifier = data_asset.active_batch_id + else: + batch_identifier = data_asset.batch_id + + if isinstance( + validation_result_suite_identifier, ValidationResultIdentifier + ): + expectation_suite_name = ( + validation_result_suite_identifier.expectation_suite_identifier.expectation_suite_name + ) + run_id = validation_result_suite_identifier.run_id + batch_identifier = validation_result_suite_identifier.batch_identifier + + # Returns datasets and corresponding batch requests + datasets = self.get_dataset_partitions(batch_identifier, data_asset) + + if len(datasets) == 0 or datasets[0]["dataset_urn"] is None: + warn("Metadata not sent to datahub. No datasets found.") + return {"datahub_notification_result": "none required"} + + # Returns assertion info and assertion results + assertions = self.get_assertions_with_results( + validation_result_suite, + expectation_suite_name, + run_id, + payload, + datasets, + ) + + logger.info("Sending metadata to datahub ...") + logger.info("Dataset URN - {urn}".format(urn=datasets[0]["dataset_urn"])) + + for assertion in assertions: + logger.info( + "Assertion URN - {urn}".format(urn=assertion["assertionUrn"]) + ) + + # Construct a MetadataChangeProposalWrapper object. + assertion_info_mcp = MetadataChangeProposalWrapper( + entityUrn=assertion["assertionUrn"], + aspect=assertion["assertionInfo"], + ) + emitter.emit_mcp(assertion_info_mcp) + + # Construct a MetadataChangeProposalWrapper object. + assertion_platform_mcp = MetadataChangeProposalWrapper( + entityUrn=assertion["assertionUrn"], + aspect=assertion["assertionPlatform"], + ) + emitter.emit_mcp(assertion_platform_mcp) + + for assertionResult in assertion["assertionResults"]: + dataset_assertionResult_mcp = MetadataChangeProposalWrapper( + entityUrn=assertionResult.assertionUrn, + aspect=assertionResult, + ) + + # Emit Result! (timeseries aspect) + emitter.emit_mcp(dataset_assertionResult_mcp) + logger.info("Metadata sent to datahub.") + result = "DataHub notification succeeded" + except Exception as e: + result = "DataHub notification failed" + if self.graceful_exceptions: + logger.error(e) + logger.info("Suppressing error because graceful_exceptions is set") + else: + raise + + return {"datahub_notification_result": result} + + def get_assertions_with_results( + self, + validation_result_suite, + expectation_suite_name, + run_id, + payload, + datasets, + ): + dataPlatformInstance = DataPlatformInstance( + platform=builder.make_data_platform_urn(GE_PLATFORM_NAME) + ) + docs_link = None + if payload: + # process the payload + for action_names in payload.keys(): + if payload[action_names]["class"] == "UpdateDataDocsAction": + data_docs_pages = payload[action_names] + for docs_link_key, docs_link_val in data_docs_pages.items(): + if "file://" not in docs_link_val and docs_link_key != "class": + docs_link = docs_link_val + + assertions_with_results = [] + for result in validation_result_suite.results: + expectation_config = result["expectation_config"] + expectation_type = expectation_config["expectation_type"] + success = bool(result["success"]) + kwargs = { + k: v for k, v in expectation_config["kwargs"].items() if k != "batch_id" + } + + result = result["result"] + assertion_datasets = [d["dataset_urn"] for d in datasets] + if len(datasets) == 1 and "column" in kwargs: + assertion_fields = [ + builder.make_schema_field_urn( + datasets[0]["dataset_urn"], kwargs["column"] + ) + ] + else: + assertion_fields = None # type:ignore + + # Be careful what fields to consider for creating assertion urn. + # Any change in fields below would lead to a new assertion + # FIXME - Currently, when using evaluation parameters, new assertion is + # created when runtime resolved kwargs are different, + # possibly for each validation run + assertionUrn = builder.make_assertion_urn( + builder.datahub_guid( + pre_json_transform( + { + "platform": GE_PLATFORM_NAME, + "nativeType": expectation_type, + "nativeParameters": kwargs, + "dataset": assertion_datasets[0], + "fields": assertion_fields, + } + ) + ) + ) + logger.debug( + "GE expectation_suite_name - {name}, expectation_type - {type}, Assertion URN - {urn}".format( + name=expectation_suite_name, type=expectation_type, urn=assertionUrn + ) + ) + assertionInfo: AssertionInfo = self.get_assertion_info( + expectation_type, + kwargs, + assertion_datasets[0], + assertion_fields, + expectation_suite_name, + ) + + # TODO: Understand why their run time is incorrect. + run_time = run_id.run_time.astimezone(timezone.utc) + evaluation_parameters = ( + { + k: convert_to_string(v) + for k, v in validation_result_suite.evaluation_parameters.items() + if k and v + } + if validation_result_suite.evaluation_parameters + else None + ) + + nativeResults = { + k: convert_to_string(v) + for k, v in result.items() + if ( + k + in [ + "observed_value", + "partial_unexpected_list", + "partial_unexpected_counts", + "details", + ] + and v + ) + } + + actualAggValue = ( + result.get("observed_value") + if isinstance(result.get("observed_value"), (int, float)) + else None + ) + + ds = datasets[0] + # https://docs.greatexpectations.io/docs/reference/expectations/result_format/ + assertionResult = AssertionRunEvent( + timestampMillis=int(round(time.time() * 1000)), + assertionUrn=assertionUrn, + asserteeUrn=ds["dataset_urn"], + runId=run_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + result=AssertionResult( + type=( + AssertionResultType.SUCCESS + if success + else AssertionResultType.FAILURE + ), + rowCount=parse_int_or_default(result.get("element_count")), + missingCount=parse_int_or_default(result.get("missing_count")), + unexpectedCount=parse_int_or_default( + result.get("unexpected_count") + ), + actualAggValue=actualAggValue, + externalUrl=docs_link, + nativeResults=nativeResults, + ), + batchSpec=ds["batchSpec"], + status=AssertionRunStatus.COMPLETE, + runtimeContext=evaluation_parameters, + ) + if ds.get("partitionSpec") is not None: + assertionResult.partitionSpec = ds.get("partitionSpec") + assertionResults = [assertionResult] + assertions_with_results.append( + { + "assertionUrn": assertionUrn, + "assertionInfo": assertionInfo, + "assertionPlatform": dataPlatformInstance, + "assertionResults": assertionResults, + } + ) + return assertions_with_results + + def get_assertion_info( + self, expectation_type, kwargs, dataset, fields, expectation_suite_name + ): + # TODO - can we find exact type of min and max value + def get_min_max(kwargs, type=AssertionStdParameterType.UNKNOWN): + return AssertionStdParameters( + minValue=AssertionStdParameter( + value=convert_to_string(kwargs.get("min_value")), + type=type, + ), + maxValue=AssertionStdParameter( + value=convert_to_string(kwargs.get("max_value")), + type=type, + ), + ) + + known_expectations: Dict[str, DataHubStdAssertion] = { + # column aggregate expectations + "expect_column_min_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.MIN, + parameters=get_min_max(kwargs), + ), + "expect_column_max_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.MAX, + parameters=get_min_max(kwargs), + ), + "expect_column_median_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.MEDIAN, + parameters=get_min_max(kwargs), + ), + "expect_column_stdev_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.STDDEV, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_column_mean_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.MEAN, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_column_unique_value_count_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.UNIQUE_COUNT, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_column_proportion_of_unique_values_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.UNIQUE_PROPOTION, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_column_sum_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.SUM, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_column_quantile_values_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation._NATIVE_, + ), + # column map expectations + "expect_column_values_to_not_be_null": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.NOT_NULL, + aggregation=AssertionStdAggregation.IDENTITY, + ), + "expect_column_values_to_be_in_set": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.IN, + aggregation=AssertionStdAggregation.IDENTITY, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("value_set")), + type=AssertionStdParameterType.SET, + ) + ), + ), + "expect_column_values_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.IDENTITY, + parameters=get_min_max(kwargs), + ), + "expect_column_values_to_match_regex": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.REGEX_MATCH, + aggregation=AssertionStdAggregation.IDENTITY, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=kwargs.get("regex"), + type=AssertionStdParameterType.STRING, + ) + ), + ), + "expect_column_values_to_match_regex_list": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_COLUMN, + operator=AssertionStdOperator.REGEX_MATCH, + aggregation=AssertionStdAggregation.IDENTITY, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("regex_list")), + type=AssertionStdParameterType.LIST, + ) + ), + ), + "expect_table_columns_to_match_ordered_list": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_SCHEMA, + operator=AssertionStdOperator.EQUAL_TO, + aggregation=AssertionStdAggregation.COLUMNS, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("column_list")), + type=AssertionStdParameterType.LIST, + ) + ), + ), + "expect_table_columns_to_match_set": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_SCHEMA, + operator=AssertionStdOperator.EQUAL_TO, + aggregation=AssertionStdAggregation.COLUMNS, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("column_set")), + type=AssertionStdParameterType.SET, + ) + ), + ), + "expect_table_column_count_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_SCHEMA, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.COLUMN_COUNT, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + "expect_table_column_count_to_equal": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_SCHEMA, + operator=AssertionStdOperator.EQUAL_TO, + aggregation=AssertionStdAggregation.COLUMN_COUNT, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("value")), + type=AssertionStdParameterType.NUMBER, + ) + ), + ), + "expect_column_to_exist": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_SCHEMA, + operator=AssertionStdOperator._NATIVE_, + aggregation=AssertionStdAggregation._NATIVE_, + ), + "expect_table_row_count_to_equal": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_ROWS, + operator=AssertionStdOperator.EQUAL_TO, + aggregation=AssertionStdAggregation.ROW_COUNT, + parameters=AssertionStdParameters( + value=AssertionStdParameter( + value=convert_to_string(kwargs.get("value")), + type=AssertionStdParameterType.NUMBER, + ) + ), + ), + "expect_table_row_count_to_be_between": DataHubStdAssertion( + scope=DatasetAssertionScope.DATASET_ROWS, + operator=AssertionStdOperator.BETWEEN, + aggregation=AssertionStdAggregation.ROW_COUNT, + parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), + ), + } + + datasetAssertionInfo = DatasetAssertionInfo( + dataset=dataset, + fields=fields, + operator=AssertionStdOperator._NATIVE_, + aggregation=AssertionStdAggregation._NATIVE_, + nativeType=expectation_type, + nativeParameters={k: convert_to_string(v) for k, v in kwargs.items()}, + scope=DatasetAssertionScope.DATASET_ROWS, + ) + + if expectation_type in known_expectations.keys(): + assertion = known_expectations[expectation_type] + datasetAssertionInfo.scope = assertion.scope + datasetAssertionInfo.aggregation = assertion.aggregation + datasetAssertionInfo.operator = assertion.operator + datasetAssertionInfo.parameters = assertion.parameters + + # Heuristically mapping other expectations + else: + if "column" in kwargs and expectation_type.startswith( + "expect_column_value" + ): + datasetAssertionInfo.scope = DatasetAssertionScope.DATASET_COLUMN + datasetAssertionInfo.aggregation = AssertionStdAggregation.IDENTITY + elif "column" in kwargs: + datasetAssertionInfo.scope = DatasetAssertionScope.DATASET_COLUMN + datasetAssertionInfo.aggregation = AssertionStdAggregation._NATIVE_ + + return AssertionInfo( + type=AssertionType.DATASET, + datasetAssertion=datasetAssertionInfo, + customProperties={"expectation_suite_name": expectation_suite_name}, + ) + + def get_dataset_partitions(self, batch_identifier, data_asset): + dataset_partitions = [] + + logger.debug("Finding datasets being validated") + + # for now, we support only v3-api and sqlalchemy execution engine and Pandas engine + is_sql_alchemy = isinstance(data_asset, Validator) and ( + isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine) + ) + is_pandas = isinstance(data_asset.execution_engine, PandasExecutionEngine) + if is_sql_alchemy or is_pandas: + ge_batch_spec = data_asset.active_batch_spec + partitionSpec = None + batchSpecProperties = { + "data_asset_name": str( + data_asset.active_batch_definition.data_asset_name + ), + "datasource_name": str( + data_asset.active_batch_definition.datasource_name + ), + } + sqlalchemy_uri = None + if is_sql_alchemy and isinstance( + data_asset.execution_engine.engine, Engine + ): + sqlalchemy_uri = data_asset.execution_engine.engine.url + # For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection + elif is_sql_alchemy and isinstance( + data_asset.execution_engine.engine, Connection + ): + sqlalchemy_uri = data_asset.execution_engine.engine.engine.url + + if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec): + # e.g. ConfiguredAssetSqlDataConnector with splitter_method or sampling_method + schema_name = ge_batch_spec.get("schema_name") + table_name = ge_batch_spec.get("table_name") + + dataset_urn = make_dataset_urn_from_sqlalchemy_uri( + sqlalchemy_uri, + schema_name, + table_name, + self.env, + self.get_platform_instance( + data_asset.active_batch_definition.datasource_name + ), + self.exclude_dbname, + self.platform_alias, + self.convert_urns_to_lowercase, + ) + batchSpec = BatchSpec( + nativeBatchId=batch_identifier, + customProperties=batchSpecProperties, + ) + + splitter_method = ge_batch_spec.get("splitter_method") + if ( + splitter_method is not None + and splitter_method != "_split_on_whole_table" + ): + batch_identifiers = ge_batch_spec.get("batch_identifiers", {}) + partitionSpec = PartitionSpecClass( + partition=convert_to_string(batch_identifiers) + ) + sampling_method = ge_batch_spec.get("sampling_method", "") + if sampling_method == "_sample_using_limit": + batchSpec.limit = ge_batch_spec["sampling_kwargs"]["n"] + + dataset_partitions.append( + { + "dataset_urn": dataset_urn, + "partitionSpec": partitionSpec, + "batchSpec": batchSpec, + } + ) + elif isinstance(ge_batch_spec, RuntimeQueryBatchSpec): + if not self.parse_table_names_from_sql: + warn( + "Enable parse_table_names_from_sql in DatahubValidationAction config\ + to try to parse the tables being asserted from SQL query" + ) + return [] + query = data_asset.batches[ + batch_identifier + ].batch_request.runtime_parameters["query"] + partitionSpec = PartitionSpecClass( + type=PartitionTypeClass.QUERY, + partition=f"Query_{builder.datahub_guid(pre_json_transform(query))}", + ) + + batchSpec = BatchSpec( + nativeBatchId=batch_identifier, + query=query, + customProperties=batchSpecProperties, + ) + try: + tables = DefaultSQLParser(query).get_tables() + except Exception as e: + logger.warning(f"Sql parser failed on {query} with {e}") + tables = [] + + if len(set(tables)) != 1: + warn( + "DataHubValidationAction does not support cross dataset assertions." + ) + return [] + for table in tables: + dataset_urn = make_dataset_urn_from_sqlalchemy_uri( + sqlalchemy_uri, + None, + table, + self.env, + self.get_platform_instance( + data_asset.active_batch_definition.datasource_name + ), + self.exclude_dbname, + self.platform_alias, + self.convert_urns_to_lowercase, + ) + dataset_partitions.append( + { + "dataset_urn": dataset_urn, + "partitionSpec": partitionSpec, + "batchSpec": batchSpec, + } + ) + elif isinstance(ge_batch_spec, RuntimeDataBatchSpec): + data_platform = self.get_platform_instance( + data_asset.active_batch_definition.datasource_name + ) + dataset_urn = builder.make_dataset_urn_with_platform_instance( + platform=( + data_platform + if self.platform_alias is None + else self.platform_alias + ), + name=data_asset.active_batch_definition.datasource_name, + platform_instance="", + env=self.env, + ) + batchSpec = BatchSpec( + nativeBatchId=batch_identifier, + query="", + customProperties=batchSpecProperties, + ) + dataset_partitions.append( + { + "dataset_urn": dataset_urn, + "partitionSpec": partitionSpec, + "batchSpec": batchSpec, + } + ) + else: + warn( + "DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format( + batch_spec_type=type(ge_batch_spec) + ) + ) + else: + # TODO - v2-spec - SqlAlchemyDataset support + warn( + "DataHubValidationAction does not recognize this GE data asset type - {asset_type}. This is either using v2-api or execution engine other than sqlalchemy.".format( + asset_type=type(data_asset) + ) + ) + + return dataset_partitions + + def get_platform_instance(self, datasource_name): + if self.platform_instance_map and datasource_name in self.platform_instance_map: + return self.platform_instance_map[datasource_name] + else: + warn( + f"Datasource {datasource_name} is not present in platform_instance_map" + ) + return None + + +def parse_int_or_default(value, default_value=None): + if value is None: + return default_value + else: + return int(value) + + +def make_dataset_urn_from_sqlalchemy_uri( + sqlalchemy_uri, + schema_name, + table_name, + env, + platform_instance=None, + exclude_dbname=None, + platform_alias=None, + convert_urns_to_lowercase=False, +): + data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri)) + url_instance = make_url(sqlalchemy_uri) + + if schema_name is None and "." in table_name: + schema_name, table_name = table_name.split(".")[-2:] + + if data_platform in ["redshift", "postgres"]: + schema_name = schema_name or "public" + if url_instance.database is None: + warn( + f"DataHubValidationAction failed to locate database name for {data_platform}." + ) + return None + schema_name = ( + schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" + ) + elif data_platform == "mssql": + schema_name = schema_name or "dbo" + if url_instance.database is None: + warn( + f"DataHubValidationAction failed to locate database name for {data_platform}." + ) + return None + schema_name = ( + schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" + ) + elif data_platform in ["trino", "snowflake"]: + if schema_name is None or url_instance.database is None: + warn( + "DataHubValidationAction failed to locate schema name and/or database name for {data_platform}.".format( + data_platform=data_platform + ) + ) + return None + # If data platform is snowflake, we artificially lowercase the Database name. + # This is because DataHub also does this during ingestion. + # Ref: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155 + database_name = ( + url_instance.database.lower() + if data_platform == "snowflake" + else url_instance.database + ) + if database_name.endswith(f"/{schema_name}"): + database_name = database_name[: -len(f"/{schema_name}")] + schema_name = ( + schema_name if exclude_dbname else f"{database_name}.{schema_name}" + ) + + elif data_platform == "bigquery": + if url_instance.host is None or url_instance.database is None: + warn( + "DataHubValidationAction failed to locate host and/or database name for {data_platform}. ".format( + data_platform=data_platform + ) + ) + return None + schema_name = f"{url_instance.host}.{url_instance.database}" + + schema_name = schema_name or url_instance.database + if schema_name is None: + warn( + f"DataHubValidationAction failed to locate schema name for {data_platform}." + ) + return None + + dataset_name = f"{schema_name}.{table_name}" + + if convert_urns_to_lowercase: + dataset_name = dataset_name.lower() + + dataset_urn = builder.make_dataset_urn_with_platform_instance( + platform=data_platform if platform_alias is None else platform_alias, + name=dataset_name, + platform_instance=platform_instance, + env=env, + ) + + return dataset_urn + + +@dataclass +class DataHubStdAssertion: + scope: Union[str, DatasetAssertionScope] + operator: Union[str, AssertionStdOperator] + aggregation: Union[str, AssertionStdAggregation] + parameters: Optional[AssertionStdParameters] = None + + +class DecimalEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, Decimal): + return str(o) + return super().default(o) + + +def convert_to_string(var: Any) -> str: + try: + tmp = ( + str(var) + if isinstance(var, (str, int, float)) + else json.dumps(var, cls=DecimalEncoder) + ) + except TypeError as e: + logger.debug(e) + tmp = str(var) + return tmp + + +def warn(msg): + logger.warning(msg) diff --git a/metadata-ingestion-modules/gx-plugin/tests/__init__.py b/metadata-ingestion-modules/gx-plugin/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion-modules/gx-plugin/tests/conftest.py b/metadata-ingestion-modules/gx-plugin/tests/conftest.py new file mode 100644 index 0000000000000..c99230fba3094 --- /dev/null +++ b/metadata-ingestion-modules/gx-plugin/tests/conftest.py @@ -0,0 +1 @@ +from datahub.testing.docker_utils import docker_compose_runner # noqa: F401 diff --git a/metadata-ingestion/tests/integration/great-expectations/docker-compose.yml b/metadata-ingestion-modules/gx-plugin/tests/integration/docker-compose.yml similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/docker-compose.yml rename to metadata-ingestion-modules/gx-plugin/tests/integration/docker-compose.yml diff --git a/metadata-ingestion/tests/integration/great-expectations/ge_mcps_golden.json b/metadata-ingestion-modules/gx-plugin/tests/integration/ge_mcps_golden.json similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/ge_mcps_golden.json rename to metadata-ingestion-modules/gx-plugin/tests/integration/ge_mcps_golden.json diff --git a/metadata-ingestion/tests/integration/great-expectations/ge_mcps_golden_2.json b/metadata-ingestion-modules/gx-plugin/tests/integration/ge_mcps_golden_2.json similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/ge_mcps_golden_2.json rename to metadata-ingestion-modules/gx-plugin/tests/integration/ge_mcps_golden_2.json diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint.yml similarity index 97% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint.yml index 466cbfe39a4ab..0e6fa886d5784 100644 --- a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml +++ b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint.yml @@ -19,7 +19,7 @@ action_list: site_names: [] - name: datahub_action action: - module_name: datahub.integrations.great_expectations.action + module_name: datahub_gx_plugin.action class_name: DataHubValidationAction server_url: http://localhost:8080 graceful_exceptions: False diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint_2.yml b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint_2.yml similarity index 97% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint_2.yml rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint_2.yml index 409d93f64db16..d0fa2a8c17992 100644 --- a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint_2.yml +++ b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/checkpoints/test_checkpoint_2.yml @@ -19,7 +19,7 @@ action_list: site_names: [] - name: datahub_action action: - module_name: datahub.integrations.great_expectations.action + module_name: datahub_gx_plugin.action class_name: DataHubValidationAction server_url: http://localhost:8080 graceful_exceptions: False diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/expectations/.ge_store_backend_id b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/expectations/.ge_store_backend_id similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/expectations/.ge_store_backend_id rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/expectations/.ge_store_backend_id diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/expectations/test_suite.json b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/expectations/test_suite.json similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/expectations/test_suite.json rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/expectations/test_suite.json diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/great_expectations.yml b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/great_expectations.yml similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/great_expectations.yml rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/great_expectations.yml diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/great_expectations/plugins/custom_data_docs/styles/data_docs_custom_styles.css diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/setup.sql b/metadata-ingestion-modules/gx-plugin/tests/integration/setup/setup.sql similarity index 100% rename from metadata-ingestion/tests/integration/great-expectations/setup/setup.sql rename to metadata-ingestion-modules/gx-plugin/tests/integration/setup/setup.sql diff --git a/metadata-ingestion/tests/integration/great-expectations/test_great_expectations.py b/metadata-ingestion-modules/gx-plugin/tests/integration/test_great_expectations.py similarity index 68% rename from metadata-ingestion/tests/integration/great-expectations/test_great_expectations.py rename to metadata-ingestion-modules/gx-plugin/tests/integration/test_great_expectations.py index 0bb87b993e6b0..b03681dc78058 100644 --- a/metadata-ingestion/tests/integration/great-expectations/test_great_expectations.py +++ b/metadata-ingestion-modules/gx-plugin/tests/integration/test_great_expectations.py @@ -1,17 +1,30 @@ +import os import shutil from typing import List from unittest import mock +import packaging.version import pytest -from freezegun import freeze_time -from great_expectations.data_context.data_context.file_data_context import ( - FileDataContext, -) - from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.sink.file import write_metadata_file -from tests.test_helpers import mce_helpers -from tests.test_helpers.docker_helpers import wait_for_port +from datahub.testing.compare_metadata_json import assert_metadata_files_equal +from datahub.testing.docker_utils import wait_for_port +from freezegun import freeze_time +from great_expectations.data_context import FileDataContext + +try: + from great_expectations import __version__ as GX_VERSION # type: ignore + + use_gx_folder = packaging.version.parse(GX_VERSION) > packaging.version.Version( + "0.17.0" + ) +except Exception: + use_gx_folder = False + + +def should_update_golden_file() -> bool: + return bool(os.getenv("DATAHUB_GOLDEN_FILE_UPDATE", False)) + FROZEN_TIME = "2021-12-28 12:00:00" @@ -40,12 +53,11 @@ def test_ge_ingest( docker_compose_runner, pytestconfig, tmp_path, - mock_time, checkpoint, golden_json, **kwargs, ): - test_resources_dir = pytestconfig.rootpath / "tests/integration/great-expectations" + test_resources_dir = pytestconfig.rootpath / "tests/integration" with docker_compose_runner( test_resources_dir / "docker-compose.yml", "great-expectations" @@ -57,18 +69,21 @@ def test_ge_ingest( emitter = MockDatahubEmitter("") mock_emit_mcp.side_effect = emitter.emit_mcp + gx_context_folder_name = "gx" if use_gx_folder else "great_expectations" shutil.copytree( test_resources_dir / "setup/great_expectations", - tmp_path / "great_expectations", + tmp_path / gx_context_folder_name, ) + context = FileDataContext.create(tmp_path) context.run_checkpoint(checkpoint_name=checkpoint) emitter.write_to_file(tmp_path / "ge_mcps.json") - mce_helpers.check_golden_file( - pytestconfig, + assert_metadata_files_equal( output_path=tmp_path / "ge_mcps.json", golden_path=test_resources_dir / golden_json, + copy_output=False, + update_golden=should_update_golden_file(), ignore_paths=[], ) diff --git a/metadata-ingestion/tests/unit/test_great_expectations_action.py b/metadata-ingestion-modules/gx-plugin/tests/unit/test_great_expectations_action.py similarity index 98% rename from metadata-ingestion/tests/unit/test_great_expectations_action.py rename to metadata-ingestion-modules/gx-plugin/tests/unit/test_great_expectations_action.py index 2e23949d29689..c870a4449abea 100644 --- a/metadata-ingestion/tests/unit/test_great_expectations_action.py +++ b/metadata-ingestion-modules/gx-plugin/tests/unit/test_great_expectations_action.py @@ -4,6 +4,22 @@ import pandas as pd import pytest +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.schema_classes import ( + AssertionInfoClass, + AssertionResultClass, + AssertionResultTypeClass, + AssertionRunEventClass, + AssertionRunStatusClass, + AssertionStdParameterClass, + AssertionStdParametersClass, + AssertionTypeClass, + BatchSpecClass, + DataPlatformInstanceClass, + DatasetAssertionInfoClass, + DatasetAssertionScopeClass, + PartitionSpecClass, +) from great_expectations.core.batch import Batch, BatchDefinition, BatchRequest from great_expectations.core.batch_spec import ( RuntimeDataBatchSpec, @@ -14,10 +30,7 @@ ) from great_expectations.core.id_dict import IDDict from great_expectations.core.run_identifier import RunIdentifier -from great_expectations.data_context import DataContext -from great_expectations.data_context.data_context.file_data_context import ( - FileDataContext, -) +from great_expectations.data_context import DataContext, FileDataContext from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ValidationResultIdentifier, @@ -33,23 +46,7 @@ ) from great_expectations.validator.validator import Validator -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.integrations.great_expectations.action import DataHubValidationAction -from datahub.metadata.schema_classes import ( - AssertionInfoClass, - AssertionResultClass, - AssertionResultTypeClass, - AssertionRunEventClass, - AssertionRunStatusClass, - AssertionStdParameterClass, - AssertionStdParametersClass, - AssertionTypeClass, - BatchSpecClass, - DataPlatformInstanceClass, - DatasetAssertionInfoClass, - DatasetAssertionScopeClass, - PartitionSpecClass, -) +from datahub_gx_plugin.action import DataHubValidationAction logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index e0dbc7c8d4b14..b37c4e5ad9673 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -68,6 +68,18 @@ cd metadata-ingestion-modules/dagster-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` + +### (Optional) Set up your Python environment for developing on GX Plugin + +From the repository root: + +```shell +cd metadata-ingestion-modules/gx-plugin +../../gradlew :metadata-ingestion-modules:gx-plugin:installDev +source venv/bin/activate +datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" +``` + ### Common setup issues Common issues (click to expand): diff --git a/metadata-ingestion/integration_docs/great-expectations.md b/metadata-ingestion/integration_docs/great-expectations.md index 80f5bedf42661..9a4097a8f3af3 100644 --- a/metadata-ingestion/integration_docs/great-expectations.md +++ b/metadata-ingestion/integration_docs/great-expectations.md @@ -23,7 +23,7 @@ This integration does not support 1. Install the required dependency in your Great Expectations environment. ```shell - pip install 'acryl-datahub[great-expectations]' + pip install 'acryl-datahub-gx-plugin' ``` @@ -32,7 +32,7 @@ This integration does not support action_list: - name: datahub_action action: - module_name: datahub.integrations.great_expectations.action + module_name: module_name: datahub_gx_plugin.action class_name: DataHubValidationAction server_url: http://localhost:8080 #datahub server url ``` diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d59545694c324..88c60e00c2e90 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -8,7 +8,9 @@ _version: str = package_metadata["__version__"] _self_pin = ( - f"=={_version}" if not (_version.endswith("dev0") or "docker" in _version) else "" + f"=={_version}" + if not (_version.endswith(("dev0", "dev1")) or "docker" in _version) + else "" ) base_requirements = { @@ -173,7 +175,7 @@ *sqlglot_lib, "GitPython>2", "python-liquid", - "deepmerge>=1.1.1" + "deepmerge>=1.1.1", } bigquery_common = { @@ -332,7 +334,9 @@ "gql[requests]>=3.3.0", }, "datahub": mysql | kafka_common, - "great-expectations": sql_common | sqllineage_lib, + "great-expectations": { + f"acryl-datahub-gx-plugin{_self_pin}", + }, # Misc plugins. "sql-parser": sqlglot_lib, # Source plugins @@ -482,6 +486,9 @@ # The Airflow extra is only retained for compatibility, but new users should # be using the datahub-airflow-plugin package instead. "airflow", + # The great-expectations extra is only retained for compatibility, but new users should + # be using the datahub-gx-plugin package instead. + "great-expectations", # SQL Server ODBC requires additional drivers, and so we don't want to keep # it included in the default "all" installation. "mssql-odbc", @@ -527,9 +534,12 @@ } -pytest_dep = "pytest>=6.2.2" -deepdiff_dep = "deepdiff" -test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"} +test_api_requirements = { + "pytest>=6.2.2", + "deepdiff", + "PyYAML", + "pytest-docker>=1.1.0", +} debug_requirements = { "memray", @@ -551,12 +561,9 @@ "isort>=5.7.0", "mypy==1.10.1", *test_api_requirements, - pytest_dep, "pytest-asyncio>=0.16.0", "pytest-cov>=2.8.1", - "pytest-docker>=1.1.0", "pytest-random-order~=1.1.0", - deepdiff_dep, "requests-mock", "freezegun", "jsonpickle", @@ -590,7 +597,6 @@ "kafka", "datahub-rest", "datahub-lite", - "great-expectations", "presto", "redash", "redshift", diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 94501b0d499b7..cdc8c8268b488 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -1,867 +1,3 @@ -from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED +from datahub_gx_plugin.action import DataHubValidationAction -import json -import logging -import sys -import time -from dataclasses import dataclass -from datetime import timezone -from decimal import Decimal -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union - -from great_expectations.checkpoint.actions import ValidationAction -from great_expectations.core.batch import Batch -from great_expectations.core.batch_spec import ( - RuntimeDataBatchSpec, - RuntimeQueryBatchSpec, - SqlAlchemyDatasourceBatchSpec, -) -from great_expectations.core.expectation_validation_result import ( - ExpectationSuiteValidationResult, -) -from great_expectations.data_asset.data_asset import DataAsset -from great_expectations.data_context.data_context import DataContext -from great_expectations.data_context.types.resource_identifiers import ( - ExpectationSuiteIdentifier, - ValidationResultIdentifier, -) -from great_expectations.execution_engine import PandasExecutionEngine -from great_expectations.execution_engine.sqlalchemy_execution_engine import ( - SqlAlchemyExecutionEngine, -) -from great_expectations.validator.validator import Validator -from sqlalchemy.engine.base import Connection, Engine -from sqlalchemy.engine.url import make_url - -import datahub.emitter.mce_builder as builder -from datahub.cli.env_utils import get_boolean_env_variable -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.emitter.serialization_helper import pre_json_transform -from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( - get_platform_from_sqlalchemy_uri, -) -from datahub.metadata.com.linkedin.pegasus2avro.assertion import ( - AssertionInfo, - AssertionResult, - AssertionResultType, - AssertionRunEvent, - AssertionRunStatus, - AssertionStdAggregation, - AssertionStdOperator, - AssertionStdParameter, - AssertionStdParameters, - AssertionStdParameterType, - AssertionType, - BatchSpec, - DatasetAssertionInfo, - DatasetAssertionScope, -) -from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance -from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass -from datahub.utilities.sql_parser import DefaultSQLParser - -if TYPE_CHECKING: - from great_expectations.data_context.types.resource_identifiers import ( - GXCloudIdentifier, - ) - -assert MARKUPSAFE_PATCHED -logger = logging.getLogger(__name__) -if get_boolean_env_variable("DATAHUB_DEBUG", False): - handler = logging.StreamHandler(stream=sys.stdout) - logger.addHandler(handler) - logger.setLevel(logging.DEBUG) - -GE_PLATFORM_NAME = "great-expectations" - - -class DataHubValidationAction(ValidationAction): - def __init__( - self, - data_context: DataContext, - server_url: str, - env: str = builder.DEFAULT_ENV, - platform_alias: Optional[str] = None, - platform_instance_map: Optional[Dict[str, str]] = None, - graceful_exceptions: bool = True, - token: Optional[str] = None, - timeout_sec: Optional[float] = None, - retry_status_codes: Optional[List[int]] = None, - retry_max_times: Optional[int] = None, - extra_headers: Optional[Dict[str, str]] = None, - exclude_dbname: Optional[bool] = None, - parse_table_names_from_sql: bool = False, - convert_urns_to_lowercase: bool = False, - ): - super().__init__(data_context) - self.server_url = server_url - self.env = env - self.platform_alias = platform_alias - self.platform_instance_map = platform_instance_map - self.graceful_exceptions = graceful_exceptions - self.token = token - self.timeout_sec = timeout_sec - self.retry_status_codes = retry_status_codes - self.retry_max_times = retry_max_times - self.extra_headers = extra_headers - self.exclude_dbname = exclude_dbname - self.parse_table_names_from_sql = parse_table_names_from_sql - self.convert_urns_to_lowercase = convert_urns_to_lowercase - - def _run( - self, - validation_result_suite: ExpectationSuiteValidationResult, - validation_result_suite_identifier: Union[ - ValidationResultIdentifier, "GXCloudIdentifier" - ], - data_asset: Union[Validator, DataAsset, Batch], - payload: Optional[Any] = None, - expectation_suite_identifier: Optional[ExpectationSuiteIdentifier] = None, - checkpoint_identifier: Optional[Any] = None, - ) -> Dict: - datasets = [] - try: - emitter = DatahubRestEmitter( - gms_server=self.server_url, - token=self.token, - read_timeout_sec=self.timeout_sec, - connect_timeout_sec=self.timeout_sec, - retry_status_codes=self.retry_status_codes, - retry_max_times=self.retry_max_times, - extra_headers=self.extra_headers, - ) - - expectation_suite_name = validation_result_suite.meta.get( - "expectation_suite_name" - ) - run_id = validation_result_suite.meta.get("run_id") - if hasattr(data_asset, "active_batch_id"): - batch_identifier = data_asset.active_batch_id - else: - batch_identifier = data_asset.batch_id - - if isinstance( - validation_result_suite_identifier, ValidationResultIdentifier - ): - expectation_suite_name = ( - validation_result_suite_identifier.expectation_suite_identifier.expectation_suite_name - ) - run_id = validation_result_suite_identifier.run_id - batch_identifier = validation_result_suite_identifier.batch_identifier - - # Returns datasets and corresponding batch requests - datasets = self.get_dataset_partitions(batch_identifier, data_asset) - - if len(datasets) == 0 or datasets[0]["dataset_urn"] is None: - warn("Metadata not sent to datahub. No datasets found.") - return {"datahub_notification_result": "none required"} - - # Returns assertion info and assertion results - assertions = self.get_assertions_with_results( - validation_result_suite, - expectation_suite_name, - run_id, - payload, - datasets, - ) - - logger.info("Sending metadata to datahub ...") - logger.info("Dataset URN - {urn}".format(urn=datasets[0]["dataset_urn"])) - - for assertion in assertions: - logger.info( - "Assertion URN - {urn}".format(urn=assertion["assertionUrn"]) - ) - - # Construct a MetadataChangeProposalWrapper object. - assertion_info_mcp = MetadataChangeProposalWrapper( - entityUrn=assertion["assertionUrn"], - aspect=assertion["assertionInfo"], - ) - emitter.emit_mcp(assertion_info_mcp) - - # Construct a MetadataChangeProposalWrapper object. - assertion_platform_mcp = MetadataChangeProposalWrapper( - entityUrn=assertion["assertionUrn"], - aspect=assertion["assertionPlatform"], - ) - emitter.emit_mcp(assertion_platform_mcp) - - for assertionResult in assertion["assertionResults"]: - dataset_assertionResult_mcp = MetadataChangeProposalWrapper( - entityUrn=assertionResult.assertionUrn, - aspect=assertionResult, - ) - - # Emit Result! (timeseries aspect) - emitter.emit_mcp(dataset_assertionResult_mcp) - logger.info("Metadata sent to datahub.") - result = "DataHub notification succeeded" - except Exception as e: - result = "DataHub notification failed" - if self.graceful_exceptions: - logger.error(e) - logger.info("Suppressing error because graceful_exceptions is set") - else: - raise - - return {"datahub_notification_result": result} - - def get_assertions_with_results( - self, - validation_result_suite, - expectation_suite_name, - run_id, - payload, - datasets, - ): - dataPlatformInstance = DataPlatformInstance( - platform=builder.make_data_platform_urn(GE_PLATFORM_NAME) - ) - docs_link = None - if payload: - # process the payload - for action_names in payload.keys(): - if payload[action_names]["class"] == "UpdateDataDocsAction": - data_docs_pages = payload[action_names] - for docs_link_key, docs_link_val in data_docs_pages.items(): - if "file://" not in docs_link_val and docs_link_key != "class": - docs_link = docs_link_val - - assertions_with_results = [] - for result in validation_result_suite.results: - expectation_config = result["expectation_config"] - expectation_type = expectation_config["expectation_type"] - success = bool(result["success"]) - kwargs = { - k: v for k, v in expectation_config["kwargs"].items() if k != "batch_id" - } - - result = result["result"] - assertion_datasets = [d["dataset_urn"] for d in datasets] - if len(datasets) == 1 and "column" in kwargs: - assertion_fields = [ - builder.make_schema_field_urn( - datasets[0]["dataset_urn"], kwargs["column"] - ) - ] - else: - assertion_fields = None # type:ignore - - # Be careful what fields to consider for creating assertion urn. - # Any change in fields below would lead to a new assertion - # FIXME - Currently, when using evaluation parameters, new assertion is - # created when runtime resolved kwargs are different, - # possibly for each validation run - assertionUrn = builder.make_assertion_urn( - builder.datahub_guid( - pre_json_transform( - { - "platform": GE_PLATFORM_NAME, - "nativeType": expectation_type, - "nativeParameters": kwargs, - "dataset": assertion_datasets[0], - "fields": assertion_fields, - } - ) - ) - ) - logger.debug( - "GE expectation_suite_name - {name}, expectation_type - {type}, Assertion URN - {urn}".format( - name=expectation_suite_name, type=expectation_type, urn=assertionUrn - ) - ) - assertionInfo: AssertionInfo = self.get_assertion_info( - expectation_type, - kwargs, - assertion_datasets[0], - assertion_fields, - expectation_suite_name, - ) - - # TODO: Understand why their run time is incorrect. - run_time = run_id.run_time.astimezone(timezone.utc) - evaluation_parameters = ( - { - k: convert_to_string(v) - for k, v in validation_result_suite.evaluation_parameters.items() - if k and v - } - if validation_result_suite.evaluation_parameters - else None - ) - - nativeResults = { - k: convert_to_string(v) - for k, v in result.items() - if ( - k - in [ - "observed_value", - "partial_unexpected_list", - "partial_unexpected_counts", - "details", - ] - and v - ) - } - - actualAggValue = ( - result.get("observed_value") - if isinstance(result.get("observed_value"), (int, float)) - else None - ) - - ds = datasets[0] - # https://docs.greatexpectations.io/docs/reference/expectations/result_format/ - assertionResult = AssertionRunEvent( - timestampMillis=int(round(time.time() * 1000)), - assertionUrn=assertionUrn, - asserteeUrn=ds["dataset_urn"], - runId=run_time.strftime("%Y-%m-%dT%H:%M:%SZ"), - result=AssertionResult( - type=AssertionResultType.SUCCESS - if success - else AssertionResultType.FAILURE, - rowCount=parse_int_or_default(result.get("element_count")), - missingCount=parse_int_or_default(result.get("missing_count")), - unexpectedCount=parse_int_or_default( - result.get("unexpected_count") - ), - actualAggValue=actualAggValue, - externalUrl=docs_link, - nativeResults=nativeResults, - ), - batchSpec=ds["batchSpec"], - status=AssertionRunStatus.COMPLETE, - runtimeContext=evaluation_parameters, - ) - if ds.get("partitionSpec") is not None: - assertionResult.partitionSpec = ds.get("partitionSpec") - assertionResults = [assertionResult] - assertions_with_results.append( - { - "assertionUrn": assertionUrn, - "assertionInfo": assertionInfo, - "assertionPlatform": dataPlatformInstance, - "assertionResults": assertionResults, - } - ) - return assertions_with_results - - def get_assertion_info( - self, expectation_type, kwargs, dataset, fields, expectation_suite_name - ): - # TODO - can we find exact type of min and max value - def get_min_max(kwargs, type=AssertionStdParameterType.UNKNOWN): - return AssertionStdParameters( - minValue=AssertionStdParameter( - value=convert_to_string(kwargs.get("min_value")), - type=type, - ), - maxValue=AssertionStdParameter( - value=convert_to_string(kwargs.get("max_value")), - type=type, - ), - ) - - known_expectations: Dict[str, DataHubStdAssertion] = { - # column aggregate expectations - "expect_column_min_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.MIN, - parameters=get_min_max(kwargs), - ), - "expect_column_max_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.MAX, - parameters=get_min_max(kwargs), - ), - "expect_column_median_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.MEDIAN, - parameters=get_min_max(kwargs), - ), - "expect_column_stdev_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.STDDEV, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_column_mean_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.MEAN, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_column_unique_value_count_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.UNIQUE_COUNT, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_column_proportion_of_unique_values_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.UNIQUE_PROPOTION, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_column_sum_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.SUM, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_column_quantile_values_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation._NATIVE_, - ), - # column map expectations - "expect_column_values_to_not_be_null": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.NOT_NULL, - aggregation=AssertionStdAggregation.IDENTITY, - ), - "expect_column_values_to_be_in_set": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.IN, - aggregation=AssertionStdAggregation.IDENTITY, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("value_set")), - type=AssertionStdParameterType.SET, - ) - ), - ), - "expect_column_values_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.IDENTITY, - parameters=get_min_max(kwargs), - ), - "expect_column_values_to_match_regex": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.REGEX_MATCH, - aggregation=AssertionStdAggregation.IDENTITY, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=kwargs.get("regex"), - type=AssertionStdParameterType.STRING, - ) - ), - ), - "expect_column_values_to_match_regex_list": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_COLUMN, - operator=AssertionStdOperator.REGEX_MATCH, - aggregation=AssertionStdAggregation.IDENTITY, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("regex_list")), - type=AssertionStdParameterType.LIST, - ) - ), - ), - "expect_table_columns_to_match_ordered_list": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_SCHEMA, - operator=AssertionStdOperator.EQUAL_TO, - aggregation=AssertionStdAggregation.COLUMNS, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("column_list")), - type=AssertionStdParameterType.LIST, - ) - ), - ), - "expect_table_columns_to_match_set": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_SCHEMA, - operator=AssertionStdOperator.EQUAL_TO, - aggregation=AssertionStdAggregation.COLUMNS, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("column_set")), - type=AssertionStdParameterType.SET, - ) - ), - ), - "expect_table_column_count_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_SCHEMA, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.COLUMN_COUNT, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - "expect_table_column_count_to_equal": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_SCHEMA, - operator=AssertionStdOperator.EQUAL_TO, - aggregation=AssertionStdAggregation.COLUMN_COUNT, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("value")), - type=AssertionStdParameterType.NUMBER, - ) - ), - ), - "expect_column_to_exist": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_SCHEMA, - operator=AssertionStdOperator._NATIVE_, - aggregation=AssertionStdAggregation._NATIVE_, - ), - "expect_table_row_count_to_equal": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_ROWS, - operator=AssertionStdOperator.EQUAL_TO, - aggregation=AssertionStdAggregation.ROW_COUNT, - parameters=AssertionStdParameters( - value=AssertionStdParameter( - value=convert_to_string(kwargs.get("value")), - type=AssertionStdParameterType.NUMBER, - ) - ), - ), - "expect_table_row_count_to_be_between": DataHubStdAssertion( - scope=DatasetAssertionScope.DATASET_ROWS, - operator=AssertionStdOperator.BETWEEN, - aggregation=AssertionStdAggregation.ROW_COUNT, - parameters=get_min_max(kwargs, AssertionStdParameterType.NUMBER), - ), - } - - datasetAssertionInfo = DatasetAssertionInfo( - dataset=dataset, - fields=fields, - operator=AssertionStdOperator._NATIVE_, - aggregation=AssertionStdAggregation._NATIVE_, - nativeType=expectation_type, - nativeParameters={k: convert_to_string(v) for k, v in kwargs.items()}, - scope=DatasetAssertionScope.DATASET_ROWS, - ) - - if expectation_type in known_expectations.keys(): - assertion = known_expectations[expectation_type] - datasetAssertionInfo.scope = assertion.scope - datasetAssertionInfo.aggregation = assertion.aggregation - datasetAssertionInfo.operator = assertion.operator - datasetAssertionInfo.parameters = assertion.parameters - - # Heuristically mapping other expectations - else: - if "column" in kwargs and expectation_type.startswith( - "expect_column_value" - ): - datasetAssertionInfo.scope = DatasetAssertionScope.DATASET_COLUMN - datasetAssertionInfo.aggregation = AssertionStdAggregation.IDENTITY - elif "column" in kwargs: - datasetAssertionInfo.scope = DatasetAssertionScope.DATASET_COLUMN - datasetAssertionInfo.aggregation = AssertionStdAggregation._NATIVE_ - - return AssertionInfo( - type=AssertionType.DATASET, - datasetAssertion=datasetAssertionInfo, - customProperties={"expectation_suite_name": expectation_suite_name}, - ) - - def get_dataset_partitions(self, batch_identifier, data_asset): - dataset_partitions = [] - - logger.debug("Finding datasets being validated") - - # for now, we support only v3-api and sqlalchemy execution engine and Pandas engine - is_sql_alchemy = isinstance(data_asset, Validator) and ( - isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine) - ) - is_pandas = isinstance(data_asset.execution_engine, PandasExecutionEngine) - if is_sql_alchemy or is_pandas: - ge_batch_spec = data_asset.active_batch_spec - partitionSpec = None - batchSpecProperties = { - "data_asset_name": str( - data_asset.active_batch_definition.data_asset_name - ), - "datasource_name": str( - data_asset.active_batch_definition.datasource_name - ), - } - sqlalchemy_uri = None - if is_sql_alchemy and isinstance( - data_asset.execution_engine.engine, Engine - ): - sqlalchemy_uri = data_asset.execution_engine.engine.url - # For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection - elif is_sql_alchemy and isinstance( - data_asset.execution_engine.engine, Connection - ): - sqlalchemy_uri = data_asset.execution_engine.engine.engine.url - - if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec): - # e.g. ConfiguredAssetSqlDataConnector with splitter_method or sampling_method - schema_name = ge_batch_spec.get("schema_name") - table_name = ge_batch_spec.get("table_name") - - dataset_urn = make_dataset_urn_from_sqlalchemy_uri( - sqlalchemy_uri, - schema_name, - table_name, - self.env, - self.get_platform_instance( - data_asset.active_batch_definition.datasource_name - ), - self.exclude_dbname, - self.platform_alias, - self.convert_urns_to_lowercase, - ) - batchSpec = BatchSpec( - nativeBatchId=batch_identifier, - customProperties=batchSpecProperties, - ) - - splitter_method = ge_batch_spec.get("splitter_method") - if ( - splitter_method is not None - and splitter_method != "_split_on_whole_table" - ): - batch_identifiers = ge_batch_spec.get("batch_identifiers", {}) - partitionSpec = PartitionSpecClass( - partition=convert_to_string(batch_identifiers) - ) - sampling_method = ge_batch_spec.get("sampling_method", "") - if sampling_method == "_sample_using_limit": - batchSpec.limit = ge_batch_spec["sampling_kwargs"]["n"] - - dataset_partitions.append( - { - "dataset_urn": dataset_urn, - "partitionSpec": partitionSpec, - "batchSpec": batchSpec, - } - ) - elif isinstance(ge_batch_spec, RuntimeQueryBatchSpec): - if not self.parse_table_names_from_sql: - warn( - "Enable parse_table_names_from_sql in DatahubValidationAction config\ - to try to parse the tables being asserted from SQL query" - ) - return [] - query = data_asset.batches[ - batch_identifier - ].batch_request.runtime_parameters["query"] - partitionSpec = PartitionSpecClass( - type=PartitionTypeClass.QUERY, - partition=f"Query_{builder.datahub_guid(pre_json_transform(query))}", - ) - - batchSpec = BatchSpec( - nativeBatchId=batch_identifier, - query=query, - customProperties=batchSpecProperties, - ) - try: - tables = DefaultSQLParser(query).get_tables() - except Exception as e: - logger.warning(f"Sql parser failed on {query} with {e}") - tables = [] - - if len(set(tables)) != 1: - warn( - "DataHubValidationAction does not support cross dataset assertions." - ) - return [] - for table in tables: - dataset_urn = make_dataset_urn_from_sqlalchemy_uri( - sqlalchemy_uri, - None, - table, - self.env, - self.get_platform_instance( - data_asset.active_batch_definition.datasource_name - ), - self.exclude_dbname, - self.platform_alias, - self.convert_urns_to_lowercase, - ) - dataset_partitions.append( - { - "dataset_urn": dataset_urn, - "partitionSpec": partitionSpec, - "batchSpec": batchSpec, - } - ) - elif isinstance(ge_batch_spec, RuntimeDataBatchSpec): - data_platform = self.get_platform_instance( - data_asset.active_batch_definition.datasource_name - ) - dataset_urn = builder.make_dataset_urn_with_platform_instance( - platform=data_platform - if self.platform_alias is None - else self.platform_alias, - name=data_asset.active_batch_definition.datasource_name, - platform_instance="", - env=self.env, - ) - batchSpec = BatchSpec( - nativeBatchId=batch_identifier, - query="", - customProperties=batchSpecProperties, - ) - dataset_partitions.append( - { - "dataset_urn": dataset_urn, - "partitionSpec": partitionSpec, - "batchSpec": batchSpec, - } - ) - else: - warn( - "DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format( - batch_spec_type=type(ge_batch_spec) - ) - ) - else: - # TODO - v2-spec - SqlAlchemyDataset support - warn( - "DataHubValidationAction does not recognize this GE data asset type - {asset_type}. This is either using v2-api or execution engine other than sqlalchemy.".format( - asset_type=type(data_asset) - ) - ) - - return dataset_partitions - - def get_platform_instance(self, datasource_name): - if self.platform_instance_map and datasource_name in self.platform_instance_map: - return self.platform_instance_map[datasource_name] - else: - warn( - f"Datasource {datasource_name} is not present in platform_instance_map" - ) - return None - - -def parse_int_or_default(value, default_value=None): - if value is None: - return default_value - else: - return int(value) - - -def make_dataset_urn_from_sqlalchemy_uri( - sqlalchemy_uri, - schema_name, - table_name, - env, - platform_instance=None, - exclude_dbname=None, - platform_alias=None, - convert_urns_to_lowercase=False, -): - data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri)) - url_instance = make_url(sqlalchemy_uri) - - if schema_name is None and "." in table_name: - schema_name, table_name = table_name.split(".")[-2:] - - if data_platform in ["redshift", "postgres"]: - schema_name = schema_name or "public" - if url_instance.database is None: - warn( - f"DataHubValidationAction failed to locate database name for {data_platform}." - ) - return None - schema_name = ( - schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" - ) - elif data_platform == "mssql": - schema_name = schema_name or "dbo" - if url_instance.database is None: - warn( - f"DataHubValidationAction failed to locate database name for {data_platform}." - ) - return None - schema_name = ( - schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" - ) - elif data_platform in ["trino", "snowflake"]: - if schema_name is None or url_instance.database is None: - warn( - "DataHubValidationAction failed to locate schema name and/or database name for {data_platform}.".format( - data_platform=data_platform - ) - ) - return None - # If data platform is snowflake, we artificially lowercase the Database name. - # This is because DataHub also does this during ingestion. - # Ref: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155 - database_name = ( - url_instance.database.lower() - if data_platform == "snowflake" - else url_instance.database - ) - if database_name.endswith(f"/{schema_name}"): - database_name = database_name[: -len(f"/{schema_name}")] - schema_name = ( - schema_name if exclude_dbname else f"{database_name}.{schema_name}" - ) - - elif data_platform == "bigquery": - if url_instance.host is None or url_instance.database is None: - warn( - "DataHubValidationAction failed to locate host and/or database name for {data_platform}. ".format( - data_platform=data_platform - ) - ) - return None - schema_name = f"{url_instance.host}.{url_instance.database}" - - schema_name = schema_name or url_instance.database - if schema_name is None: - warn( - f"DataHubValidationAction failed to locate schema name for {data_platform}." - ) - return None - - dataset_name = f"{schema_name}.{table_name}" - - if convert_urns_to_lowercase: - dataset_name = dataset_name.lower() - - dataset_urn = builder.make_dataset_urn_with_platform_instance( - platform=data_platform if platform_alias is None else platform_alias, - name=dataset_name, - platform_instance=platform_instance, - env=env, - ) - - return dataset_urn - - -@dataclass -class DataHubStdAssertion: - scope: Union[str, DatasetAssertionScope] - operator: Union[str, AssertionStdOperator] - aggregation: Union[str, AssertionStdAggregation] - parameters: Optional[AssertionStdParameters] = None - - -class DecimalEncoder(json.JSONEncoder): - def default(self, o): - if isinstance(o, Decimal): - return str(o) - return super().default(o) - - -def convert_to_string(var: Any) -> str: - try: - tmp = ( - str(var) - if isinstance(var, (str, int, float)) - else json.dumps(var, cls=DecimalEncoder) - ) - except TypeError as e: - logger.debug(e) - tmp = str(var) - return tmp - - -def warn(msg): - logger.warning(msg) +__all__ = ["DataHubValidationAction"] diff --git a/metadata-ingestion/src/datahub/testing/docker_utils.py b/metadata-ingestion/src/datahub/testing/docker_utils.py new file mode 100644 index 0000000000000..7c1c0304f480e --- /dev/null +++ b/metadata-ingestion/src/datahub/testing/docker_utils.py @@ -0,0 +1,70 @@ +import contextlib +import logging +import subprocess +from typing import Callable, Iterator, List, Optional, Union + +import pytest +import pytest_docker.plugin + +logger = logging.getLogger(__name__) + + +def is_responsive(container_name: str, port: int, hostname: Optional[str]) -> bool: + """A cheap way to figure out if a port is responsive on a container""" + if hostname: + cmd = f"docker exec {container_name} /bin/bash -c 'echo -n > /dev/tcp/{hostname}/{port}'" + else: + # use the hostname of the container + cmd = f"docker exec {container_name} /bin/bash -c 'c_host=`hostname`;echo -n > /dev/tcp/$c_host/{port}'" + ret = subprocess.run( + cmd, + shell=True, + ) + return ret.returncode == 0 + + +def wait_for_port( + docker_services: pytest_docker.plugin.Services, + container_name: str, + container_port: int, + hostname: Optional[str] = None, + timeout: float = 30.0, + pause: float = 0.5, + checker: Optional[Callable[[], bool]] = None, +) -> None: + try: + docker_services.wait_until_responsive( + timeout=timeout, + pause=pause, + check=( + checker + if checker + else lambda: is_responsive(container_name, container_port, hostname) + ), + ) + logger.info(f"Container {container_name} is ready!") + finally: + # use check=True to raise an error if command gave bad exit code + subprocess.run(f"docker logs {container_name}", shell=True, check=True) + + +@pytest.fixture(scope="module") +def docker_compose_runner( + docker_compose_command, docker_compose_project_name, docker_setup, docker_cleanup +): + @contextlib.contextmanager + def run( + compose_file_path: Union[str, List[str]], key: str, cleanup: bool = True + ) -> Iterator[pytest_docker.plugin.Services]: + with pytest_docker.plugin.get_docker_services( + docker_compose_command=docker_compose_command, + # We can remove the type ignore once this is merged: + # https://github.com/avast/pytest-docker/pull/108 + docker_compose_file=compose_file_path, # type: ignore + docker_compose_project_name=f"{docker_compose_project_name}-{key}", + docker_setup=docker_setup, + docker_cleanup=docker_cleanup if cleanup else [], + ) as docker_services: + yield docker_services + + return run diff --git a/metadata-ingestion/tests/test_helpers/docker_helpers.py b/metadata-ingestion/tests/test_helpers/docker_helpers.py index bacb8d80b9e72..20aec975787e4 100644 --- a/metadata-ingestion/tests/test_helpers/docker_helpers.py +++ b/metadata-ingestion/tests/test_helpers/docker_helpers.py @@ -1,52 +1,16 @@ -import contextlib import logging import os import subprocess -from typing import Callable, Iterator, List, Optional, Union import pytest -import pytest_docker.plugin -logger = logging.getLogger(__name__) - - -def is_responsive(container_name: str, port: int, hostname: Optional[str]) -> bool: - """A cheap way to figure out if a port is responsive on a container""" - if hostname: - cmd = f"docker exec {container_name} /bin/bash -c 'echo -n > /dev/tcp/{hostname}/{port}'" - else: - # use the hostname of the container - cmd = f"docker exec {container_name} /bin/bash -c 'c_host=`hostname`;echo -n > /dev/tcp/$c_host/{port}'" - ret = subprocess.run( - cmd, - shell=True, - ) - return ret.returncode == 0 +from datahub.testing.docker_utils import ( # noqa: F401 + docker_compose_runner, + is_responsive, + wait_for_port, +) - -def wait_for_port( - docker_services: pytest_docker.plugin.Services, - container_name: str, - container_port: int, - hostname: Optional[str] = None, - timeout: float = 30.0, - pause: float = 0.5, - checker: Optional[Callable[[], bool]] = None, -) -> None: - try: - docker_services.wait_until_responsive( - timeout=timeout, - pause=pause, - check=( - checker - if checker - else lambda: is_responsive(container_name, container_port, hostname) - ), - ) - logger.info(f"Container {container_name} is ready!") - finally: - # use check=True to raise an error if command gave bad exit code - subprocess.run(f"docker logs {container_name}", shell=True, check=True) +logger = logging.getLogger(__name__) @pytest.fixture(scope="session") @@ -58,28 +22,6 @@ def docker_compose_command(): return "docker compose" -@pytest.fixture(scope="module") -def docker_compose_runner( - docker_compose_command, docker_compose_project_name, docker_setup, docker_cleanup -): - @contextlib.contextmanager - def run( - compose_file_path: Union[str, List[str]], key: str, cleanup: bool = True - ) -> Iterator[pytest_docker.plugin.Services]: - with pytest_docker.plugin.get_docker_services( - docker_compose_command=docker_compose_command, - # We can remove the type ignore once this is merged: - # https://github.com/avast/pytest-docker/pull/108 - docker_compose_file=compose_file_path, # type: ignore - docker_compose_project_name=f"{docker_compose_project_name}-{key}", - docker_setup=docker_setup, - docker_cleanup=docker_cleanup if cleanup else [], - ) as docker_services: - yield docker_services - - return run - - def cleanup_image(image_name: str) -> None: assert ":" not in image_name, "image_name should not contain a tag" diff --git a/settings.gradle b/settings.gradle index b850816ab5e6b..899ca8f6f869b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -61,6 +61,7 @@ include 'metadata-integration:java:openlineage-converter' include 'metadata-integration:java:acryl-spark-lineage' include 'ingestion-scheduler' include 'metadata-ingestion-modules:airflow-plugin' +include 'metadata-ingestion-modules:gx-plugin' include 'metadata-ingestion-modules:dagster-plugin' include 'smoke-test' include 'metadata-auth:auth-api'