diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index c268a66938945..532669c44722c 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -63,8 +63,8 @@ jobs: env: ENABLE_PUBLISH: ${{ secrets.DOCKER_PASSWORD != '' && secrets.ACRYL_DOCKER_PASSWORD != '' }} run: | - echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' }}" - echo "publish=${{ env.ENABLE_PUBLISH != '' }}" >> $GITHUB_OUTPUT + echo "Enable publish: ${{ env.ENABLE_PUBLISH }}" + echo "publish=${{ env.ENABLE_PUBLISH }}" >> $GITHUB_OUTPUT gms_build: name: Build and Push DataHub GMS Docker Image @@ -451,8 +451,6 @@ jobs: tags: ${{ needs.setup.outputs.tag }} username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - build-args: | - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} publish: ${{ needs.setup.outputs.publish }} context: . file: ./docker/datahub-ingestion-base/Dockerfile @@ -481,7 +479,7 @@ jobs: uses: ishworkh/docker-image-artifact-download@v1 if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} + image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} - name: Build and push Base-Slim Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -493,16 +491,15 @@ jobs: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} build-args: | - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} APP_ENV=slim - BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} + BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} publish: ${{ needs.setup.outputs.publish }} context: . file: ./docker/datahub-ingestion-base/Dockerfile platforms: linux/amd64,linux/arm64/v8 - name: Compute DataHub Ingestion (Base-Slim) Tag id: tag - run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT datahub_ingestion_base_full_build: name: Build and Push DataHub Ingestion (Base-Full) Docker Image runs-on: ubuntu-latest @@ -524,7 +521,7 @@ jobs: uses: ishworkh/docker-image-artifact-download@v1 if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} + image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} - name: Build and push Base-Full Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -532,20 +529,19 @@ jobs: target: full-install images: | ${{ env.DATAHUB_INGESTION_BASE_IMAGE }} - tags: ${{ needs.setup.outputs.full_tag }} + tags: ${{ needs.setup.outputs.unique_full_tag }} username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} build-args: | - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} APP_ENV=full - BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }} + BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} publish: ${{ needs.setup.outputs.publish }} context: . file: ./docker/datahub-ingestion-base/Dockerfile platforms: linux/amd64,linux/arm64/v8 - name: Compute DataHub Ingestion (Base-Full) Tag id: tag - run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT datahub_ingestion_slim_build: @@ -553,6 +549,7 @@ jobs: runs-on: ubuntu-latest outputs: tag: ${{ steps.tag.outputs.tag }} + needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }} needs: [setup, datahub_ingestion_base_slim_build] steps: - name: Check out the repo @@ -572,9 +569,9 @@ jobs: run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' }} + if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }} + image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }} - name: Build and push Slim Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -584,7 +581,7 @@ jobs: ${{ env.DATAHUB_INGESTION_IMAGE }} build-args: | BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }} - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }} + DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }} APP_ENV=slim tags: ${{ needs.setup.outputs.slim_tag }} username: ${{ secrets.ACRYL_DOCKER_USERNAME }} @@ -595,7 +592,7 @@ jobs: platforms: linux/amd64,linux/arm64/v8 - name: Compute Tag id: tag - run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.slim_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT datahub_ingestion_slim_scan: permissions: contents: read # for actions/checkout to fetch code @@ -609,15 +606,15 @@ jobs: uses: actions/checkout@v3 - name: Download image Slim Image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' }} + if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.slim_tag }} + image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }} - name: Run Trivy vulnerability scanner Slim Image uses: aquasecurity/trivy-action@0.8.0 env: TRIVY_OFFLINE_SCAN: true with: - image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.slim_tag }} + image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }} format: "template" template: "@/contrib/sarif.tpl" output: "trivy-results.sarif" @@ -634,6 +631,7 @@ jobs: runs-on: ubuntu-latest outputs: tag: ${{ steps.tag.outputs.tag }} + needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }} needs: [setup, datahub_ingestion_base_full_build] steps: - name: Check out the repo @@ -653,9 +651,9 @@ jobs: run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' }} + if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }} + image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }} - name: Build and push Full Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -665,8 +663,8 @@ jobs: ${{ env.DATAHUB_INGESTION_IMAGE }} build-args: | BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }} - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }} - tags: ${{ needs.setup.outputs.full_tag }} + DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }} + tags: ${{ needs.setup.outputs.unique_full_tag }} username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} publish: ${{ needs.setup.outputs.publish }} @@ -675,7 +673,7 @@ jobs: platforms: linux/amd64,linux/arm64/v8 - name: Compute Tag (Full) id: tag - run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT datahub_ingestion_full_scan: permissions: contents: read # for actions/checkout to fetch code @@ -689,15 +687,15 @@ jobs: uses: actions/checkout@v3 - name: Download image Full Image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' }} + if: ${{ needs.datahub_ingestion_full_build.outputs.needs_artifact_download == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.full_tag }} + image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }} - name: Run Trivy vulnerability scanner Full Image uses: aquasecurity/trivy-action@0.8.0 env: TRIVY_OFFLINE_SCAN: true with: - image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.full_tag }} + image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }} format: "template" template: "@/contrib/sarif.tpl" output: "trivy-results.sarif" @@ -750,6 +748,10 @@ jobs: ./gradlew :metadata-ingestion:install - name: Disk Check run: df -h . && docker images + - name: Remove images + run: docker image prune -a -f || true + - name: Disk Check + run: df -h . && docker images - name: Download GMS image uses: ishworkh/docker-image-artifact-download@v1 if: ${{ needs.setup.outputs.publish != 'true' }} @@ -792,9 +794,9 @@ jobs: image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }} - name: Download datahub-ingestion-slim image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' }} + if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.setup.outputs.unique_tag }} + image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }} - name: Disk Check run: df -h . && docker images - name: run quickstart @@ -812,6 +814,8 @@ jobs: # we are doing this because gms takes time to get ready # and we don't have a better readiness check when bootstrap is done sleep 60s + - name: Disk Check + run: df -h . && docker images - name: Disable ES Disk Threshold run: | curl -XPUT "http://localhost:9200/_cluster/settings" \ diff --git a/docker/build.gradle b/docker/build.gradle index 829bc344411f3..ae101fe1defc5 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -87,6 +87,7 @@ task quickstartDebug(type: Exec, dependsOn: ':metadata-ingestion:install') { dependsOn(debug_modules.collect { it + ':dockerTagDebug' }) shouldRunAfter ':metadata-ingestion:clean', 'quickstartNuke' + environment "DATAHUB_PRECREATE_TOPICS", "true" environment "DATAHUB_TELEMETRY_ENABLED", "false" environment "DOCKER_COMPOSE_BASE", "file://${rootProject.projectDir}" diff --git a/docker/datahub-ingestion-base/Dockerfile b/docker/datahub-ingestion-base/Dockerfile index bb4b0bc42e167..3d47f79617370 100644 --- a/docker/datahub-ingestion-base/Dockerfile +++ b/docker/datahub-ingestion-base/Dockerfile @@ -84,4 +84,4 @@ FROM ${BASE_IMAGE} as slim-install FROM ${APP_ENV}-install USER datahub -ENV PATH="/datahub-ingestion/.local/bin:$PATH" +ENV PATH="/datahub-ingestion/.local/bin:$PATH" \ No newline at end of file diff --git a/docker/datahub-ingestion/Dockerfile b/docker/datahub-ingestion/Dockerfile index d16caea2fcecd..0ecc30d02ac3f 100644 --- a/docker/datahub-ingestion/Dockerfile +++ b/docker/datahub-ingestion/Dockerfile @@ -30,4 +30,4 @@ FROM base as dev-install FROM ${APP_ENV}-install as final USER datahub -ENV PATH="/datahub-ingestion/.local/bin:$PATH" \ No newline at end of file +ENV PATH="/datahub-ingestion/.local/bin:$PATH" diff --git a/docker/kafka-setup/Dockerfile b/docker/kafka-setup/Dockerfile index 8cf9d0869dc9b..5707234b85f57 100644 --- a/docker/kafka-setup/Dockerfile +++ b/docker/kafka-setup/Dockerfile @@ -1,5 +1,7 @@ +ARG KAFKA_DOCKER_VERSION=7.4.1 + # Using as a base image because to get the needed jars for confluent utils -FROM confluentinc/cp-base-new@sha256:ac4e0f9bcaecdab728740529f37452231fa40760fcf561759fc3b219f46d2cc9 as confluent_base +FROM confluentinc/cp-base-new:$KAFKA_DOCKER_VERSION as confluent_base ARG MAVEN_REPO="https://repo1.maven.org/maven2" ARG SNAKEYAML_VERSION="2.0" @@ -16,12 +18,6 @@ ENV SCALA_VERSION 2.13 # Set the classpath for JARs required by `cub` ENV CUB_CLASSPATH='"/usr/share/java/cp-base-new/*"' -# Confluent Docker Utils Version (Namely the tag or branch to grab from git to install) -ARG PYTHON_CONFLUENT_DOCKER_UTILS_VERSION="v0.0.60" - -# This can be overriden for an offline/air-gapped builds -ARG PYTHON_CONFLUENT_DOCKER_UTILS_INSTALL_SPEC="git+https://github.com/confluentinc/confluent-docker-utils@${PYTHON_CONFLUENT_DOCKER_UTILS_VERSION}" - LABEL name="kafka" version=${KAFKA_VERSION} RUN apk add --no-cache bash coreutils @@ -39,7 +35,6 @@ RUN mkdir -p /opt \ && pip install --no-cache-dir --upgrade pip wheel setuptools \ && pip install jinja2 requests \ && pip install "Cython<3.0" "PyYAML<6" --no-build-isolation \ - && pip install --prefer-binary --prefix=/usr/local --upgrade "${PYTHON_CONFLUENT_DOCKER_UTILS_INSTALL_SPEC}" \ && rm -rf /tmp/* \ && apk del --purge .build-deps @@ -69,7 +64,8 @@ ENV USE_CONFLUENT_SCHEMA_REGISTRY="TRUE" COPY docker/kafka-setup/kafka-setup.sh ./kafka-setup.sh COPY docker/kafka-setup/kafka-config.sh ./kafka-config.sh COPY docker/kafka-setup/kafka-topic-workers.sh ./kafka-topic-workers.sh +COPY docker/kafka-setup/kafka-ready.sh ./kafka-ready.sh -RUN chmod +x ./kafka-setup.sh && chmod +x ./kafka-topic-workers.sh +RUN chmod +x ./kafka-setup.sh ./kafka-topic-workers.sh ./kafka-ready.sh CMD ./kafka-setup.sh diff --git a/docker/kafka-setup/kafka-ready.sh b/docker/kafka-setup/kafka-ready.sh new file mode 100755 index 0000000000000..ba87bde047ef5 --- /dev/null +++ b/docker/kafka-setup/kafka-ready.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for i in {1..60} +do + kafka-broker-api-versions.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER + if [ $? -eq 0 ]; then + break + fi + if [ $i -eq 60 ]; then + echo "Kafka bootstrap server $KAFKA_BOOTSTRAP_SERVER not ready." + exit 1 + fi + sleep 5s +done diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh old mode 100644 new mode 100755 index 7b015421b7963..629e9bc9484ee --- a/docker/kafka-setup/kafka-setup.sh +++ b/docker/kafka-setup/kafka-setup.sh @@ -49,8 +49,8 @@ if [[ -n "$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" ]]; then echo "sasl.client.callback.handler.class=$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" >> $CONNECTION_PROPERTIES_PATH fi -cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180 - +# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180 +. kafka-ready.sh ############################################################ # Start Topic Creation Logic diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 31d067f984d2d..ffa685fb25826 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -121,6 +121,12 @@ class DataPlatformPair: powerbi_data_platform_name: str +@dataclass +class PowerBIPlatformDetail: + data_platform_pair: DataPlatformPair + data_platform_server: str + + class SupportedDataPlatform(Enum): POSTGRES_SQL = DataPlatformPair( powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres" @@ -382,6 +388,15 @@ class PowerBiDashboardSourceConfig( description="The instance of the platform that all assets produced by this recipe belong to", ) + # Enable advance sql construct + enable_advance_lineage_sql_construct: bool = pydantic.Field( + default=False, + description="Whether to enable advance native sql construct for parsing like join, sub-queries. " + "along this flag , the native_query_parsing should be enabled. " + "By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage " + "as this option generates the upstream datasets URN in lowercase.", + ) + @validator("dataset_type_mapping") @classmethod def map_data_platform(cls, value): diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py index 396da2d79e3b7..baaa8d5b85ae1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py @@ -5,8 +5,8 @@ from datahub.ingestion.source.powerbi.config import ( PlatformDetail, PowerBiDashboardSourceConfig, + PowerBIPlatformDetail, ) -from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable logger = logging.getLogger(__name__) @@ -14,7 +14,7 @@ class AbstractDataPlatformInstanceResolver(ABC): @abstractmethod def get_platform_instance( - self, dataplatform_table: DataPlatformTable + self, data_platform_detail: PowerBIPlatformDetail ) -> PlatformDetail: pass @@ -32,10 +32,10 @@ class ResolvePlatformInstanceFromDatasetTypeMapping( BaseAbstractDataPlatformInstanceResolver ): def get_platform_instance( - self, dataplatform_table: DataPlatformTable + self, data_platform_detail: PowerBIPlatformDetail ) -> PlatformDetail: platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[ - dataplatform_table.data_platform_pair.powerbi_data_platform_name + data_platform_detail.data_platform_pair.powerbi_data_platform_name ] if isinstance(platform, PlatformDetail): @@ -48,13 +48,13 @@ class ResolvePlatformInstanceFromServerToPlatformInstance( BaseAbstractDataPlatformInstanceResolver ): def get_platform_instance( - self, dataplatform_table: DataPlatformTable + self, data_platform_detail: PowerBIPlatformDetail ) -> PlatformDetail: return ( self.config.server_to_platform_instance[ - dataplatform_table.datasource_server + data_platform_detail.data_platform_server ] - if dataplatform_table.datasource_server + if data_platform_detail.data_platform_server in self.config.server_to_platform_instance else PlatformDetail.parse_obj({}) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py index 640bc4bd60d80..021c429c3c633 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py @@ -1,8 +1,12 @@ import logging -from typing import List +from typing import List, Optional import sqlparse +import datahub.utilities.sqlglot_lineage as sqlglot_l +from datahub.ingestion.api.common import PipelineContext +from datahub.utilities.sqlglot_lineage import SqlParsingResult + SPECIAL_CHARACTERS = ["#(lf)", "(lf)"] logger = logging.getLogger() @@ -45,3 +49,30 @@ def get_tables(native_query: str) -> List[str]: from_index = from_index + 1 return tables + + +def parse_custom_sql( + ctx: PipelineContext, + query: str, + schema: Optional[str], + database: Optional[str], + platform: str, + env: str, + platform_instance: Optional[str], +) -> Optional["SqlParsingResult"]: + + logger.debug("Using sqlglot_lineage to parse custom sql") + + sql_query = remove_special_characters(query) + + logger.debug(f"Parsing sql={sql_query}") + + return sqlglot_l.create_lineage_sql_parsed_result( + query=sql_query, + schema=schema, + database=database, + platform=platform, + platform_instance=platform_instance, + env=env, + graph=ctx.graph, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 83106c04529d1..8cc38c366c42a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -6,7 +6,14 @@ import lark from lark import Lark, Tree -from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.powerbi.config import ( + PowerBiDashboardSourceConfig, + PowerBiDashboardSourceReport, +) +from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( + AbstractDataPlatformInstanceResolver, +) from datahub.ingestion.source.powerbi.m_query import resolver, validator from datahub.ingestion.source.powerbi.m_query.data_classes import ( TRACE_POWERBI_MQUERY_PARSER, @@ -45,7 +52,9 @@ def _parse_expression(expression: str) -> Tree: def get_upstream_tables( table: Table, reporter: PowerBiDashboardSourceReport, - native_query_enabled: bool = True, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + ctx: PipelineContext, + config: PowerBiDashboardSourceConfig, parameters: Dict[str, str] = {}, ) -> List[resolver.DataPlatformTable]: if table.expression is None: @@ -58,7 +67,7 @@ def get_upstream_tables( parse_tree: Tree = _parse_expression(table.expression) valid, message = validator.validate_parse_tree( - parse_tree, native_query_enabled=native_query_enabled + parse_tree, native_query_enabled=config.native_query_parsing ) if valid is False: assert message is not None @@ -84,7 +93,11 @@ def get_upstream_tables( parse_tree=parse_tree, reporter=reporter, parameters=parameters, - ).resolve_to_data_platform_table_list() + ).resolve_to_data_platform_table_list( + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + ) except BaseException as e: reporter.report_warning(table.full_name, "Failed to process m-query expression") diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index e2b448124c89d..479f1decff903 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -6,11 +6,19 @@ from lark import Tree +import datahub.emitter.mce_builder as builder +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.powerbi.config import ( DataPlatformPair, + PlatformDetail, + PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, + PowerBIPlatformDetail, SupportedDataPlatform, ) +from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( + AbstractDataPlatformInstanceResolver, +) from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function from datahub.ingestion.source.powerbi.m_query.data_classes import ( TRACE_POWERBI_MQUERY_PARSER, @@ -19,19 +27,98 @@ IdentifierAccessor, ) from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table +from datahub.utilities.sqlglot_lineage import SqlParsingResult logger = logging.getLogger(__name__) @dataclass class DataPlatformTable: - name: str - full_name: str - datasource_server: str data_platform_pair: DataPlatformPair + urn: str + + +def urn_to_lowercase(value: str, flag: bool) -> str: + if flag is True: + return value.lower() + + return value + + +def urn_creator( + config: PowerBiDashboardSourceConfig, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + data_platform_pair: DataPlatformPair, + server: str, + qualified_table_name: str, +) -> str: + + platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=data_platform_pair, + data_platform_server=server, + ) + ) + + return builder.make_dataset_urn_with_platform_instance( + platform=data_platform_pair.datahub_data_platform_name, + platform_instance=platform_detail.platform_instance, + env=platform_detail.env, + name=urn_to_lowercase( + qualified_table_name, config.convert_lineage_urns_to_lowercase + ), + ) class AbstractDataPlatformTableCreator(ABC): + """ + Base class to share common functionalities among different dataplatform for M-Query parsing. + + To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and + the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example see below M-Query. + + let + Source = Sql.Database("localhost", "library"), + dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data] + in + dbo_book_issue + + It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in second argument + of first statement and schema-name and table-name is available in second statement. second statement can be repeated to access different tables from MSSQL. + + DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern + + data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to + find out database-name , schema-name and table-name also varies as per dataplatform. + + Value.NativeQuery is one of the function which is used to execute native query inside M-Query, for example see below M-Query + + let + Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) + in + Source + + In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query. + + NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing. + + """ + + ctx: PipelineContext + config: PowerBiDashboardSourceConfig + platform_instance_resolver: AbstractDataPlatformInstanceResolver + + def __init__( + self, + ctx: PipelineContext, + config: PowerBiDashboardSourceConfig, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + ) -> None: + super().__init__() + self.ctx = ctx + self.config = config + self.platform_instance_resolver = platform_instance_resolver + @abstractmethod def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail @@ -58,6 +145,49 @@ def get_db_detail_from_argument( return arguments[0], arguments[1] + def parse_custom_sql( + self, query: str, server: str, database: Optional[str], schema: Optional[str] + ) -> List[DataPlatformTable]: + + dataplatform_tables: List[DataPlatformTable] = [] + + platform_detail: PlatformDetail = ( + self.platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=self.get_platform_pair(), + data_platform_server=server, + ) + ) + ) + + parsed_result: Optional[ + "SqlParsingResult" + ] = native_sql_parser.parse_custom_sql( + ctx=self.ctx, + query=query, + platform=self.get_platform_pair().datahub_data_platform_name, + platform_instance=platform_detail.platform_instance, + env=platform_detail.env, + database=database, + schema=schema, + ) + + if parsed_result is None: + logger.debug("Failed to parse query") + return dataplatform_tables + + for urn in parsed_result.in_tables: + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Generated dataplatform_tables={dataplatform_tables}") + + return dataplatform_tables + class AbstractDataAccessMQueryResolver(ABC): table: Table @@ -80,11 +210,29 @@ def __init__( self.data_access_functions = SupportedResolver.get_function_names() @abstractmethod - def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: + def resolve_to_data_platform_table_list( + self, + ctx: PipelineContext, + config: PowerBiDashboardSourceConfig, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + ) -> List[DataPlatformTable]: pass class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): + """ + This class parses the M-Query recursively to generate DataAccessFunctionDetail (see method create_data_access_functional_detail). + + This class has generic code to process M-Query tokens and create instance of DataAccessFunctionDetail. + + Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator + (see method resolve_to_data_platform_table_list). + + Classes which extended from AbstractDataPlatformTableCreator knows how to convert generated DataAccessFunctionDetail instance + to respective DataPlatformTable instance as per dataplatform. + + """ + def get_item_selector_tokens( self, expression_tree: Tree, @@ -318,9 +466,15 @@ def internal( return table_links - def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: + def resolve_to_data_platform_table_list( + self, + ctx: PipelineContext, + config: PowerBiDashboardSourceConfig, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + ) -> List[DataPlatformTable]: data_platform_tables: List[DataPlatformTable] = [] + # Find out output variable as we are doing backtracking in M-Query output_variable: Optional[str] = tree_function.get_output_variable( self.parse_tree ) @@ -332,12 +486,14 @@ def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: ) return data_platform_tables + # Parse M-Query and use output_variable as root of tree and create instance of DataAccessFunctionDetail table_links: List[ DataAccessFunctionDetail ] = self.create_data_access_functional_detail(output_variable) # Each item is data-access function for f_detail in table_links: + # Get & Check if we support data-access-function available in M-Query supported_resolver = SupportedResolver.get_resolver( f_detail.data_access_function_name ) @@ -351,8 +507,14 @@ def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: ) continue + # From supported_resolver enum get respective resolver like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it + # & also pass additional information that will be need to generate urn table_full_name_creator: AbstractDataPlatformTableCreator = ( - supported_resolver.get_table_full_name_creator()() + supported_resolver.get_table_full_name_creator()( + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + ) ) data_platform_tables.extend( @@ -393,18 +555,24 @@ def two_level_access_pattern( IdentifierAccessor, data_access_func_detail.identifier_accessor ).items["Item"] - full_table_name: str = f"{db_name}.{schema_name}.{table_name}" + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" logger.debug( - f"Platform({self.get_platform_pair().datahub_data_platform_name}) full_table_name= {full_table_name}" + f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}" + ) + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, ) return [ DataPlatformTable( - name=table_name, - full_name=full_table_name, - datasource_server=server, data_platform_pair=self.get_platform_pair(), + urn=urn, ) ] @@ -420,9 +588,48 @@ def get_platform_pair(self) -> DataPlatformPair: class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources): + # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16 + DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo + def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.MS_SQL.value + def create_urn_using_old_parser( + self, query: str, db_name: str, server: str + ) -> List[DataPlatformTable]: + dataplatform_tables: List[DataPlatformTable] = [] + + tables: List[str] = native_sql_parser.get_tables(query) + + for table in tables: + schema_and_table: List[str] = table.split(".") + if len(schema_and_table) == 1: + # schema name is not present. set default schema + schema_and_table.insert(0, MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA) + + qualified_table_name = ( + f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}" + ) + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Generated upstream tables = {dataplatform_tables}") + + return dataplatform_tables + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail ) -> List[DataPlatformTable]: @@ -442,28 +649,20 @@ def create_dataplatform_tables( logger.debug("Unsupported case is found. Second index is not the Query") return dataplatform_tables - db_name: str = arguments[1] - - tables: List[str] = native_sql_parser.get_tables(arguments[3]) - for table in tables: - schema_and_table: List[str] = table.split(".") - if len(schema_and_table) == 1: - # schema name is not present. Default schema name in MS-SQL is dbo - # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16 - schema_and_table.insert(0, "dbo") - - dataplatform_tables.append( - DataPlatformTable( - name=schema_and_table[1], - full_name=f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}", - datasource_server=arguments[0], - data_platform_pair=self.get_platform_pair(), - ) + if self.config.enable_advance_lineage_sql_construct is False: + # Use previous parser to generate URN to keep backward compatibility + return self.create_urn_using_old_parser( + query=arguments[3], + db_name=arguments[1], + server=arguments[0], ) - logger.debug("MS-SQL full-table-names %s", dataplatform_tables) - - return dataplatform_tables + return self.parse_custom_sql( + query=arguments[3], + database=arguments[1], + server=arguments[0], + schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA, + ) class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator): @@ -510,12 +709,20 @@ def create_dataplatform_tables( cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, ).items["Name"] + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + return [ DataPlatformTable( - name=table_name, - full_name=f"{db_name}.{schema_name}.{table_name}", - datasource_server=server, data_platform_pair=self.get_platform_pair(), + urn=urn, ) ] @@ -547,14 +754,28 @@ def create_dataplatform_tables( db_name: str = value_dict["Database"] schema_name: str = value_dict["Schema"] table_name: str = value_dict["Table"] + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list) + if server is None: + logger.info( + f"server information is not available for {qualified_table_name}. Skipping upstream table" + ) + return [] + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) return [ DataPlatformTable( - name=table_name, - full_name=f"{db_name}.{schema_name}.{table_name}", - datasource_server=server if server else "", data_platform_pair=self.get_platform_pair(), + urn=urn, ) ] @@ -589,20 +810,26 @@ def create_dataplatform_tables( IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore ).items["Name"] - full_table_name: str = f"{db_name}.{schema_name}.{table_name}" + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" logger.debug( - f"{self.get_platform_pair().datahub_data_platform_name} full-table-name {full_table_name}" + f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}" + ) + + server: str = self.get_datasource_server(arguments, data_access_func_detail) + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, ) return [ DataPlatformTable( - name=table_name, - full_name=full_table_name, - datasource_server=self.get_datasource_server( - arguments, data_access_func_detail - ), data_platform_pair=self.get_platform_pair(), + urn=urn, ) ] @@ -654,12 +881,20 @@ def create_dataplatform_tables( cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, ).items["Name"] + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + return [ DataPlatformTable( - name=table_name, - full_name=f"{db_name}.{schema_name}.{table_name}", - datasource_server=server, data_platform_pair=self.get_platform_pair(), + urn=urn, ) ] @@ -681,6 +916,39 @@ def is_native_parsing_supported(data_access_function_name: str) -> bool: in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM ) + def create_urn_using_old_parser( + self, query: str, server: str + ) -> List[DataPlatformTable]: + dataplatform_tables: List[DataPlatformTable] = [] + + tables: List[str] = native_sql_parser.get_tables(query) + + for qualified_table_name in tables: + if len(qualified_table_name.split(".")) != 3: + logger.debug( + f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format" + ) + continue + + urn = urn_creator( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Generated dataplatform_tables {dataplatform_tables}") + + return dataplatform_tables + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail ) -> List[DataPlatformTable]: @@ -727,25 +995,21 @@ def create_dataplatform_tables( 0 ] # Remove any whitespaces and double quotes character - for table in native_sql_parser.get_tables(sql_query): - if len(table.split(".")) != 3: - logger.debug( - f"Skipping table {table} as it is not as per full_table_name format" - ) - continue + server = tree_function.strip_char_from_list([data_access_tokens[2]])[0] - dataplatform_tables.append( - DataPlatformTable( - name=table.split(".")[2], - full_name=table, - datasource_server=tree_function.strip_char_from_list( - [data_access_tokens[2]] - )[0], - data_platform_pair=self.get_platform_pair(), - ) + if self.config.enable_advance_lineage_sql_construct is False: + # Use previous parser to generate URN to keep backward compatibility + return self.create_urn_using_old_parser( + query=sql_query, + server=server, ) - return dataplatform_tables + return self.parse_custom_sql( + query=sql_query, + server=server, + database=None, # database and schema is available inside custom sql as per PowerBI Behavior + schema=None, + ) class FunctionName(Enum): diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 919cb83e4d832..5d477ee090e7e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -28,7 +28,6 @@ ) from datahub.ingestion.source.powerbi.config import ( Constant, - PlatformDetail, PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, ) @@ -96,10 +95,12 @@ def __hash__(self): def __init__( self, + ctx: PipelineContext, config: PowerBiDashboardSourceConfig, reporter: PowerBiDashboardSourceReport, dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver, ): + self.__ctx = ctx self.__config = config self.__reporter = reporter self.__dataplatform_instance_resolver = dataplatform_instance_resolver @@ -172,43 +173,40 @@ def extract_lineage( # table.dataset should always be set, but we check it just in case. parameters = table.dataset.parameters if table.dataset else {} - upstreams: List[UpstreamClass] = [] - upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables( - table, self.__reporter, parameters=parameters + upstream: List[UpstreamClass] = [] + + upstream_dpts: List[resolver.DataPlatformTable] = parser.get_upstream_tables( + table=table, + reporter=self.__reporter, + platform_instance_resolver=self.__dataplatform_instance_resolver, + ctx=self.__ctx, + config=self.__config, + parameters=parameters, ) + logger.debug( - f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_tables}" + f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_dpts}" ) - for upstream_table in upstream_tables: + + for upstream_dpt in upstream_dpts: if ( - upstream_table.data_platform_pair.powerbi_data_platform_name + upstream_dpt.data_platform_pair.powerbi_data_platform_name not in self.__config.dataset_type_mapping.keys() ): logger.debug( - f"Skipping upstream table for {ds_urn}. The platform {upstream_table.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping", + f"Skipping upstream table for {ds_urn}. The platform {upstream_dpt.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping", ) continue - platform_detail: PlatformDetail = ( - self.__dataplatform_instance_resolver.get_platform_instance( - upstream_table - ) - ) - upstream_urn = builder.make_dataset_urn_with_platform_instance( - platform=upstream_table.data_platform_pair.datahub_data_platform_name, - platform_instance=platform_detail.platform_instance, - env=platform_detail.env, - name=self.lineage_urn_to_lowercase(upstream_table.full_name), - ) - upstream_table_class = UpstreamClass( - upstream_urn, + upstream_dpt.urn, DatasetLineageTypeClass.TRANSFORMED, ) - upstreams.append(upstream_table_class) - if len(upstreams) > 0: - upstream_lineage = UpstreamLineageClass(upstreams=upstreams) + upstream.append(upstream_table_class) + + if len(upstream) > 0: + upstream_lineage = UpstreamLineageClass(upstreams=upstream) logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}") mcp = MetadataChangeProposalWrapper( entityType=Constant.DATASET, @@ -1107,7 +1105,9 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext): ) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising # unwanted stacktrace on console - self.mapper = Mapper(config, self.reporter, self.dataplatform_instance_resolver) + self.mapper = Mapper( + ctx, config, self.reporter, self.dataplatform_instance_resolver + ) # Create and register the stateful ingestion use-case handler. self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 6752bdf519830..ec0af37089b1d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -31,6 +31,7 @@ from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError import datahub.emitter.mce_builder as builder +import datahub.utilities.sqlglot_lineage as sqlglot_l from datahub.configuration.common import ( AllowDenyPattern, ConfigModel, @@ -136,12 +137,7 @@ ViewPropertiesClass, ) from datahub.utilities import config_clean -from datahub.utilities.sqlglot_lineage import ( - ColumnLineageInfo, - SchemaResolver, - SqlParsingResult, - sqlglot_lineage, -) +from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult logger: logging.Logger = logging.getLogger(__name__) @@ -1585,42 +1581,14 @@ def parse_custom_sql( f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}" ) - parsed_result: Optional["SqlParsingResult"] = None - try: - schema_resolver = ( - self.ctx.graph._make_schema_resolver( - platform=platform, - platform_instance=platform_instance, - env=env, - ) - if self.ctx.graph is not None - else SchemaResolver( - platform=platform, - platform_instance=platform_instance, - env=env, - graph=None, - ) - ) - - if schema_resolver.graph is None: - logger.warning( - "Column Level Lineage extraction would not work as DataHub graph client is None." - ) - - parsed_result = sqlglot_lineage( - query, - schema_resolver=schema_resolver, - default_db=upstream_db, - ) - except Exception as e: - self.report.report_warning( - key="csql-lineage", - reason=f"Unable to retrieve lineage from query. " - f"Query: {query} " - f"Reason: {str(e)} ", - ) - - return parsed_result + return sqlglot_l.create_lineage_sql_parsed_result( + query=query, + database=upstream_db, + platform=platform, + platform_instance=platform_instance, + env=env, + graph=self.ctx.graph, + ) def _create_lineage_from_unsupported_csql( self, csql_urn: str, csql: dict diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index d5da93c7be35e..49f56b46fb012 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -176,10 +176,8 @@ def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInf for table in runner.target_tables ], ) - except Exception: - logger.info( - f"Could not parse query via lineage runner, {query}", exc_info=True - ) + except Exception as e: + logger.info(f"Could not parse query via lineage runner, {query}: {e!r}") return None @staticmethod @@ -202,8 +200,8 @@ def _parse_query_via_spark_sql_plan(self, query: str) -> Optional[StringTableInf return GenericTableInfo( source_tables=[t for t in tables if t], target_tables=[] ) - except Exception: - logger.info(f"Could not parse query via spark plan, {query}", exc_info=True) + except Exception as e: + logger.info(f"Could not parse query via spark plan, {query}: {e!r}") return None @staticmethod diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index e5a9954802019..6d028c4ac1b9e 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -825,3 +825,43 @@ def sqlglot_lineage( table_error=e, ), ) + + +def create_lineage_sql_parsed_result( + query: str, + database: Optional[str], + platform: str, + platform_instance: Optional[str], + env: str, + schema: Optional[str] = None, + graph: Optional[DataHubGraph] = None, +) -> Optional["SqlParsingResult"]: + + parsed_result: Optional["SqlParsingResult"] = None + try: + schema_resolver = ( + graph._make_schema_resolver( + platform=platform, + platform_instance=platform_instance, + env=env, + ) + if graph is not None + else SchemaResolver( + platform=platform, + platform_instance=platform_instance, + env=env, + graph=None, + ) + ) + + parsed_result = sqlglot_lineage( + query, + schema_resolver=schema_resolver, + default_db=database, + default_schema=schema, + ) + except Exception as e: + logger.debug(f"Fail to prase query {query}", exc_info=e) + logger.warning("Fail to parse custom SQL") + + return parsed_result diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index 5c9553402a8c4..e77a12aa4088e 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -1,17 +1,22 @@ import logging import sys -from typing import List +from typing import List, Tuple import pytest from lark import Tree import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes -from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport -from datahub.ingestion.source.powerbi.m_query import parser, tree_function -from datahub.ingestion.source.powerbi.m_query.resolver import ( - DataPlatformTable, - SupportedDataPlatform, +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.powerbi.config import ( + PowerBiDashboardSourceConfig, + PowerBiDashboardSourceReport, +) +from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( + AbstractDataPlatformInstanceResolver, + create_dataplatform_instance_resolver, ) +from datahub.ingestion.source.powerbi.m_query import parser, tree_function +from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable M_QUERIES = [ 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', @@ -38,9 +43,31 @@ 'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1', 'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source', 'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"', + "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source", ] +def get_default_instances( + override_config: dict = {}, +) -> Tuple[ + PipelineContext, PowerBiDashboardSourceConfig, AbstractDataPlatformInstanceResolver +]: + config: PowerBiDashboardSourceConfig = PowerBiDashboardSourceConfig.parse_obj( + { + "tenant_id": "fake", + "client_id": "foo", + "client_secret": "bar", + **override_config, + } + ) + + platform_instance_resolver: AbstractDataPlatformInstanceResolver = ( + create_dataplatform_instance_resolver(config) + ) + + return PipelineContext(run_id="fake"), config, platform_instance_resolver + + @pytest.mark.integration def test_parse_m_query1(): expression: str = M_QUERIES[0] @@ -145,20 +172,20 @@ def test_snowflake_regular_case(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == "TESTTABLE" - assert data_platform_tables[0].full_name == "PBI_TEST.TEST.TESTTABLE" assert ( - data_platform_tables[0].datasource_server - == "bu10758.ap-unknown-2.fakecomputing.com" - ) - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,pbi_test.test.testtable,PROD)" ) @@ -174,17 +201,21 @@ def test_postgres_regular_case(): ) reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == "order_date" - assert data_platform_tables[0].full_name == "mics.public.order_date" - assert data_platform_tables[0].datasource_server == "localhost" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)" ) @@ -200,19 +231,21 @@ def test_databricks_regular_case(): ) reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == "public_consumer_price_index" assert ( - data_platform_tables[0].full_name - == "hive_metastore.sandbox_revenue.public_consumer_price_index" - ) - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.DATABRICK_SQL.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.sandbox_revenue.public_consumer_price_index,PROD)" ) @@ -228,17 +261,21 @@ def test_oracle_regular_case(): ) reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == "EMPLOYEES" - assert data_platform_tables[0].full_name == "salesdb.HR.EMPLOYEES" - assert data_platform_tables[0].datasource_server == "localhost:1521" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.ORACLE.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:oracle,salesdb.hr.employees,PROD)" ) @@ -255,17 +292,20 @@ def test_mssql_regular_case(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == "book_issue" - assert data_platform_tables[0].full_name == "library.dbo.book_issue" - assert data_platform_tables[0].datasource_server == "localhost" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:mssql,library.dbo.book_issue,PROD)" ) @@ -280,14 +320,16 @@ def test_mssql_with_query(): M_QUERIES[11], ] expected_tables = [ - "COMMOPSDB.dbo.V_OIP_ENT_2022", - "COMMOPSDB.dbo.V_INVOICE_BOOKING_2022", - "COMMOPSDB.dbo.V_ARR_ADDS", - "COMMOPSDB.dbo.V_PS_CD_RETENTION", - "COMMOPSDB.dbo.V_TPV_LEADERBOARD", - "COMMOPSDB.dbo.V_ENTERPRISE_INVOICED_REVENUE", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_oip_ent_2022,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_invoice_booking_2022,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_arr_adds,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_tpv_leaderboard,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_enterprise_invoiced_revenue,PROD)", ] + ctx, config, platform_instance_resolver = get_default_instances() + for index, query in enumerate(mssql_queries): table: powerbi_data_classes.Table = powerbi_data_classes.Table( columns=[], @@ -299,17 +341,15 @@ def test_mssql_with_query(): reporter = PowerBiDashboardSourceReport() data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=False + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == expected_tables[index].split(".")[2] - assert data_platform_tables[0].full_name == expected_tables[index] - assert data_platform_tables[0].datasource_server == "AUPRDWHDB" - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name - ) + assert data_platform_tables[0].urn == expected_tables[index] @pytest.mark.integration @@ -322,12 +362,14 @@ def test_snowflake_native_query(): ] expected_tables = [ - "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", - "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", - "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", - "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_aps_sme_units_v4,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)", ] + ctx, config, platform_instance_resolver = get_default_instances() + for index, query in enumerate(snowflake_queries): table: powerbi_data_classes.Table = powerbi_data_classes.Table( columns=[], @@ -339,20 +381,15 @@ def test_snowflake_native_query(): reporter = PowerBiDashboardSourceReport() data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == expected_tables[index].split(".")[2] - assert data_platform_tables[0].full_name == expected_tables[index] - assert ( - data_platform_tables[0].datasource_server - == "bu10758.ap-unknown-2.fakecomputing.com" - ) - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name - ) + assert data_platform_tables[0].urn == expected_tables[index] def test_google_bigquery_1(): @@ -363,16 +400,20 @@ def test_google_bigquery_1(): ) reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=False + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) + assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].full_name == table.full_name - assert data_platform_tables[0].datasource_server == "seraphic-music-344307" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,seraphic-music-344307.school_dataset.first,PROD)" ) @@ -387,23 +428,24 @@ def test_google_bigquery_2(): ) reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( table, reporter, - native_query_enabled=False, parameters={ "Parameter - Source": "my-test-project", "My bq project": "gcp_billing", }, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].full_name == table.full_name - assert data_platform_tables[0].datasource_server == "my-test-project" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.gcp_billing.gcp_table,PROD)" ) @@ -416,23 +458,24 @@ def test_for_each_expression_1(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( table, reporter, - native_query_enabled=False, parameters={ "Parameter - Source": "my-test-project", "My bq project": "gcp_billing", }, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].datasource_server == "my-test-project" - assert data_platform_tables[0].full_name == table.full_name assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.universal.d_wh_date,PROD)" ) @@ -445,22 +488,23 @@ def test_for_each_expression_2(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( table, reporter, - native_query_enabled=False, parameters={ "dwh-prod": "originally-not-a-variable-ref-and-not-resolved", }, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].full_name == table.full_name - assert data_platform_tables[0].datasource_server == "dwh-prod" assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:bigquery,dwh-prod.gcp_billing.d_gcp_custom_label,PROD)" ) @@ -476,8 +520,14 @@ def test_native_query_disabled(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + config.native_query_parsing = False data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=False + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 0 @@ -493,26 +543,25 @@ def test_multi_source_table(): ) reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=False + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 2 - assert data_platform_tables[0].full_name == "mics.public.order_date" - assert data_platform_tables[0].datasource_server == "localhost" - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name - ) - - assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST_VIEW" assert ( - data_platform_tables[1].datasource_server - == "ghh48144.snowflakefakecomputing.com" + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)" ) assert ( - data_platform_tables[1].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name + data_platform_tables[1].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst_view,PROD)" ) @@ -521,36 +570,33 @@ def test_table_combine(): table: powerbi_data_classes.Table = powerbi_data_classes.Table( columns=[], measures=[], - expression=M_QUERIES[16], # 1st index has the native query + expression=M_QUERIES[16], name="virtual_order_table", full_name="OrderDataSet.virtual_order_table", ) reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 2 - assert data_platform_tables[0].full_name == "GSL_TEST_DB.PUBLIC.SALES_FORECAST" - assert ( - data_platform_tables[0].datasource_server - == "ghh48144.snowflakefakecomputing.com" - ) - assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name - ) - assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST" assert ( - data_platform_tables[1].datasource_server - == "ghh48144.snowflakefakecomputing.com" + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_forecast,PROD)" ) + assert ( - data_platform_tables[1].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name + data_platform_tables[1].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst,PROD)" ) @@ -574,8 +620,14 @@ def test_expression_is_none(): reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) assert len(data_platform_tables) == 0 @@ -589,15 +641,20 @@ def test_redshift_regular_case(): ) reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=False + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) + assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].full_name == table.full_name assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)" ) @@ -609,13 +666,60 @@ def test_redshift_native_query(): ) reporter = PowerBiDashboardSourceReport() + ctx, config, platform_instance_resolver = get_default_instances() + + config.native_query_parsing = True + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( - table, reporter, native_query_enabled=True + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, ) + assert len(data_platform_tables) == 1 - assert data_platform_tables[0].name == table.full_name.split(".")[2] - assert data_platform_tables[0].full_name == table.full_name assert ( - data_platform_tables[0].data_platform_pair.powerbi_data_platform_name - == SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)" + ) + + +def test_sqlglot_parser(): + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + expression=M_QUERIES[24], + name="SALES_TARGET", + full_name="dev.public.sales", + ) + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances( + override_config={ + "server_to_platform_instance": { + "bu10758.ap-unknown-2.fakecomputing.com": { + "platform_instance": "sales_deployment", + "env": "PROD", + } + }, + "native_query_parsing": True, + "enable_advance_lineage_sql_construct": True, + } + ) + + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + ) + + assert len(data_platform_tables) == 2 + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit,PROD)" + ) + assert ( + data_platform_tables[1].urn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit_targets,PROD)" ) diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index d04c8d905b439..71428a7847953 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -791,11 +791,9 @@ def test_tableau_unsupported_csql(mock_datahub_graph): database_override_map={"production database": "prod"} ) - with mock.patch( - "datahub.ingestion.source.tableau.sqlglot_lineage" - ) as sqlglot_lineage: + with mock.patch("datahub.ingestion.source.tableau.sqlglot_l") as sqlglot_lineage: - sqlglot_lineage.return_value = SqlParsingResult( # type:ignore + sqlglot_lineage.create_lineage_sql_parsed_result.return_value = SqlParsingResult( # type:ignore in_tables=[ "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)" ], diff --git a/metadata-ingestion/tests/integration/vertica/docker-compose.yml b/metadata-ingestion/tests/integration/vertica/docker-compose.yml index ddaf206f236cf..84af5c32a60e3 100644 --- a/metadata-ingestion/tests/integration/vertica/docker-compose.yml +++ b/metadata-ingestion/tests/integration/vertica/docker-compose.yml @@ -1,6 +1,7 @@ version: "3.9" services: vertica: + platform: linux/amd64 environment: APP_DB_USER: "dbadmin" APP_DB_PASSWORD: "abc123" @@ -18,6 +19,3 @@ services: volumes: vertica-data: - - - diff --git a/metadata-ingestion/tests/integration/vertica/test_vertica.py b/metadata-ingestion/tests/integration/vertica/test_vertica.py index db8bfd247313b..fe306d1d0b2b8 100644 --- a/metadata-ingestion/tests/integration/vertica/test_vertica.py +++ b/metadata-ingestion/tests/integration/vertica/test_vertica.py @@ -58,6 +58,7 @@ def vertica_runner(docker_compose_runner, test_resources_dir): # Test needs more work to be done , currently it is working fine. @freeze_time(FROZEN_TIME) +@pytest.mark.skip("Failing in CI, cmd failing with exit code 1") @pytest.mark.integration def test_vertica_ingest_with_db(vertica_runner, pytestconfig, tmp_path): test_resources_dir = pytestconfig.rootpath / "tests/integration/vertica" diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/fixtures/ElasticSearchGoldenTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/fixtures/ElasticSearchGoldenTest.java index 29457f244291f..d720c95fef84d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/fixtures/ElasticSearchGoldenTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/fixtures/ElasticSearchGoldenTest.java @@ -78,23 +78,6 @@ public void testNameMatchPetProfile() { assertTrue(secondResultUrn.toString().contains("pet_profiles")); } - @Test - public void testNameMatchMemberInWorkspace() { - /* - Searching for "collaborative actionitems" should return "collaborative_actionitems" as the first search - result, followed by "collaborative_actionitems_old" - */ - assertNotNull(searchService); - SearchResult searchResult = searchAcrossEntities(searchService, "collaborative actionitems", SEARCHABLE_LONGTAIL_ENTITIES); - assertTrue(searchResult.getEntities().size() >= 2); - Urn firstResultUrn = searchResult.getEntities().get(0).getEntity(); - Urn secondResultUrn = searchResult.getEntities().get(1).getEntity(); - - // Checks that the table name is not suffixed with anything - assertTrue(firstResultUrn.toString().contains("collaborative_actionitems,")); - assertTrue(secondResultUrn.toString().contains("collaborative_actionitems_old")); - } - @Test public void testGlossaryTerms() { /* @@ -132,9 +115,53 @@ public void testNameMatchPartiallyQualified() { assertTrue(secondResultUrn.toString().contains("dbt,long_tail_companions.analytics.pet_details")); } + @Test + public void testNameMatchCollaborativeActionitems() { + /* + Searching for "collaborative actionitems" should return "collaborative_actionitems" as the first search + result, followed by "collaborative_actionitems_old" + */ + assertNotNull(searchService); + SearchResult searchResult = searchAcrossEntities(searchService, "collaborative actionitems", SEARCHABLE_LONGTAIL_ENTITIES); + assertTrue(searchResult.getEntities().size() >= 2); + Urn firstResultUrn = searchResult.getEntities().get(0).getEntity(); + Urn secondResultUrn = searchResult.getEntities().get(1).getEntity(); + + // Checks that the table name is not suffixed with anything + assertTrue(firstResultUrn.toString().contains("collaborative_actionitems,")); + assertTrue(secondResultUrn.toString().contains("collaborative_actionitems_old")); + + Double firstResultScore = searchResult.getEntities().get(0).getScore(); + Double secondResultScore = searchResult.getEntities().get(1).getScore(); + + // Checks that the scores aren't tied so that we are matching on table name more than column name + assertTrue(firstResultScore > secondResultScore); + } + + @Test + public void testNameMatchCustomerOrders() { + /* + Searching for "customer orders" should return "customer_orders" as the first search + result, not suffixed by anything + */ + assertNotNull(searchService); + SearchResult searchResult = searchAcrossEntities(searchService, "customer orders", SEARCHABLE_LONGTAIL_ENTITIES); + assertTrue(searchResult.getEntities().size() >= 2); + Urn firstResultUrn = searchResult.getEntities().get(0).getEntity(); + + // Checks that the table name is not suffixed with anything + assertTrue(firstResultUrn.toString().contains("customer_orders,")); + + Double firstResultScore = searchResult.getEntities().get(0).getScore(); + Double secondResultScore = searchResult.getEntities().get(1).getScore(); + + // Checks that the scores aren't tied so that we are matching on table name more than column name + assertTrue(firstResultScore > secondResultScore); + } + /* - * Tests that should pass but do not yet can be added below here, with the following annotation: - * @Test(enabled = false) - **/ + Tests that should pass but do not yet can be added below here, with the following annotation: + @Test(enabled = false) + */ } diff --git a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceProperties.pdl b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceProperties.pdl index 46a490dbb2925..c63cb1a97c017 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceProperties.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceProperties.pdl @@ -31,6 +31,7 @@ record DataProcessInstanceProperties includes CustomProperties, ExternalReferenc @Searchable = { "fieldType": "KEYWORD", "addToFilters": true, + "fieldName": "processType", "filterNameOverride": "Process Type" } type: optional enum DataProcessType { diff --git a/smoke-test/run-quickstart.sh b/smoke-test/run-quickstart.sh index 050b5d2db95c9..d40e4a5e7a4aa 100755 --- a/smoke-test/run-quickstart.sh +++ b/smoke-test/run-quickstart.sh @@ -15,4 +15,4 @@ echo "test_user:test_pass" >> ~/.datahub/plugins/frontend/auth/user.props echo "DATAHUB_VERSION = $DATAHUB_VERSION" DATAHUB_TELEMETRY_ENABLED=false \ DOCKER_COMPOSE_BASE="file://$( dirname "$DIR" )" \ -datahub docker quickstart --version ${DATAHUB_VERSION} --standalone_consumers --dump-logs-on-failure --kafka-setup +datahub docker quickstart --version ${DATAHUB_VERSION} --standalone_consumers --dump-logs-on-failure --kafka-setup \ No newline at end of file diff --git a/smoke-test/tests/cypress/cypress/e2e/login/login.js b/smoke-test/tests/cypress/cypress/e2e/login/login.js index f86741b5afe01..74d04aa56d0d0 100644 --- a/smoke-test/tests/cypress/cypress/e2e/login/login.js +++ b/smoke-test/tests/cypress/cypress/e2e/login/login.js @@ -4,6 +4,6 @@ describe('login', () => { cy.get('input[data-testid=username]').type(Cypress.env('ADMIN_USERNAME')); cy.get('input[data-testid=password]').type(Cypress.env('ADMIN_PASSWORD')); cy.contains('Sign In').click(); - cy.contains('Welcome back, DataHub'); + cy.contains('Welcome back, Data Hub'); }); }) diff --git a/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js b/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js index 7686acfe50de0..353570c0d955b 100644 --- a/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js +++ b/smoke-test/tests/cypress/cypress/e2e/settings/managing_groups.js @@ -64,6 +64,8 @@ describe("create and manage group", () => { }); it("update group info", () => { + var expected_name = Cypress.env('ADMIN_USERNAME') == "datahub" ? "Data Hub" : Cypress.env('ADMIN_USERNAME'); + cy.loginWithCredentials(); cy.visit("/settings/identities/groups"); cy.clickOptionWithText(group_name); @@ -77,13 +79,13 @@ describe("create and manage group", () => { cy.contains("Test group description EDITED").should("be.visible"); cy.clickOptionWithText("Add Owners"); cy.contains("Search for users or groups...").click({ force: true }); - cy.focused().type(Cypress.env('ADMIN_USERNAME')); - cy.get(".ant-select-item-option").contains(Cypress.env('ADMIN_USERNAME'), { matchCase: false }).click(); + cy.focused().type(expected_name); + cy.get(".ant-select-item-option").contains(expected_name, { matchCase: false }).click(); cy.focused().blur(); - cy.contains(Cypress.env('ADMIN_USERNAME')).should("have.length", 1); + cy.contains(expected_name).should("have.length", 1); cy.get('[role="dialog"] button').contains("Done").click(); cy.waitTextVisible("Owners Added"); - cy.contains(Cypress.env('ADMIN_USERNAME'), { matchCase: false }).should("be.visible"); + cy.contains(expected_name, { matchCase: false }).should("be.visible"); cy.clickOptionWithText("Edit Group"); cy.waitTextVisible("Edit Profile"); cy.get("#email").type(`${test_id}@testemail.com`);