diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml
index c268a66938945..532669c44722c 100644
--- a/.github/workflows/docker-unified.yml
+++ b/.github/workflows/docker-unified.yml
@@ -63,8 +63,8 @@ jobs:
env:
ENABLE_PUBLISH: ${{ secrets.DOCKER_PASSWORD != '' && secrets.ACRYL_DOCKER_PASSWORD != '' }}
run: |
- echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' }}"
- echo "publish=${{ env.ENABLE_PUBLISH != '' }}" >> $GITHUB_OUTPUT
+ echo "Enable publish: ${{ env.ENABLE_PUBLISH }}"
+ echo "publish=${{ env.ENABLE_PUBLISH }}" >> $GITHUB_OUTPUT
gms_build:
name: Build and Push DataHub GMS Docker Image
@@ -451,8 +451,6 @@ jobs:
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- build-args: |
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-ingestion-base/Dockerfile
@@ -481,7 +479,7 @@ jobs:
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Build and push Base-Slim Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
@@ -493,16 +491,15 @@ jobs:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
build-args: |
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
APP_ENV=slim
- BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
+ BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-ingestion-base/Dockerfile
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base-Slim) Tag
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_base_full_build:
name: Build and Push DataHub Ingestion (Base-Full) Docker Image
runs-on: ubuntu-latest
@@ -524,7 +521,7 @@ jobs:
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Build and push Base-Full Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
@@ -532,20 +529,19 @@ jobs:
target: full-install
images: |
${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
- tags: ${{ needs.setup.outputs.full_tag }}
+ tags: ${{ needs.setup.outputs.unique_full_tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
build-args: |
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
APP_ENV=full
- BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.tag || 'head' }}
+ BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-ingestion-base/Dockerfile
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base-Full) Tag
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_slim_build:
@@ -553,6 +549,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
+ needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }}
needs: [setup, datahub_ingestion_base_slim_build]
steps:
- name: Check out the repo
@@ -572,9 +569,9 @@ jobs:
run: ./gradlew :metadata-ingestion:codegen
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }}
- name: Build and push Slim Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
@@ -584,7 +581,7 @@ jobs:
${{ env.DATAHUB_INGESTION_IMAGE }}
build-args: |
BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.slim_tag || 'head' }}
+ DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }}
APP_ENV=slim
tags: ${{ needs.setup.outputs.slim_tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
@@ -595,7 +592,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute Tag
id: tag
- run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.slim_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_slim_scan:
permissions:
contents: read # for actions/checkout to fetch code
@@ -609,15 +606,15 @@ jobs:
uses: actions/checkout@v3
- name: Download image Slim Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' }}
+ if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.slim_tag }}
+ image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }}
- name: Run Trivy vulnerability scanner Slim Image
uses: aquasecurity/trivy-action@0.8.0
env:
TRIVY_OFFLINE_SCAN: true
with:
- image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.slim_tag }}
+ image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }}
format: "template"
template: "@/contrib/sarif.tpl"
output: "trivy-results.sarif"
@@ -634,6 +631,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
+ needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }}
needs: [setup, datahub_ingestion_base_full_build]
steps:
- name: Check out the repo
@@ -653,9 +651,9 @@ jobs:
run: ./gradlew :metadata-ingestion:codegen
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}
- name: Build and push Full Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
@@ -665,8 +663,8 @@ jobs:
${{ env.DATAHUB_INGESTION_IMAGE }}
build-args: |
BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}
- tags: ${{ needs.setup.outputs.full_tag }}
+ DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}
+ tags: ${{ needs.setup.outputs.unique_full_tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
@@ -675,7 +673,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute Tag (Full)
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.full_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_full_scan:
permissions:
contents: read # for actions/checkout to fetch code
@@ -689,15 +687,15 @@ jobs:
uses: actions/checkout@v3
- name: Download image Full Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' }}
+ if: ${{ needs.datahub_ingestion_full_build.outputs.needs_artifact_download == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.full_tag }}
+ image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }}
- name: Run Trivy vulnerability scanner Full Image
uses: aquasecurity/trivy-action@0.8.0
env:
TRIVY_OFFLINE_SCAN: true
with:
- image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.full_tag }}
+ image-ref: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }}
format: "template"
template: "@/contrib/sarif.tpl"
output: "trivy-results.sarif"
@@ -750,6 +748,10 @@ jobs:
./gradlew :metadata-ingestion:install
- name: Disk Check
run: df -h . && docker images
+ - name: Remove images
+ run: docker image prune -a -f || true
+ - name: Disk Check
+ run: df -h . && docker images
- name: Download GMS image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
@@ -792,9 +794,9 @@ jobs:
image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Download datahub-ingestion-slim image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' }}
+ if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
+ image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }}
- name: Disk Check
run: df -h . && docker images
- name: run quickstart
@@ -812,6 +814,8 @@ jobs:
# we are doing this because gms takes time to get ready
# and we don't have a better readiness check when bootstrap is done
sleep 60s
+ - name: Disk Check
+ run: df -h . && docker images
- name: Disable ES Disk Threshold
run: |
curl -XPUT "http://localhost:9200/_cluster/settings" \
diff --git a/datahub-graphql-core/src/main/resources/search.graphql b/datahub-graphql-core/src/main/resources/search.graphql
index f15535bfb4eb8..fbea66f738955 100644
--- a/datahub-graphql-core/src/main/resources/search.graphql
+++ b/datahub-graphql-core/src/main/resources/search.graphql
@@ -448,6 +448,11 @@ enum FilterOperator {
* Represent the relation: String field is one of the array values to, e.g. name in ["Profile", "Event"]
"""
IN
+
+ """
+ Represents the relation: The field exists. If the field is an array, the field is either not present or empty.
+ """
+ EXISTS
}
"""
diff --git a/datahub-web-react/src/app/entity/view/select/ViewSelect.tsx b/datahub-web-react/src/app/entity/view/select/ViewSelect.tsx
index 03689460eb02b..eda9b7d7fe2a4 100644
--- a/datahub-web-react/src/app/entity/view/select/ViewSelect.tsx
+++ b/datahub-web-react/src/app/entity/view/select/ViewSelect.tsx
@@ -1,4 +1,4 @@
-import React, { useEffect, useRef, useState } from 'react';
+import React, { CSSProperties, useEffect, useRef, useState } from 'react';
import { useHistory } from 'react-router';
import { Select } from 'antd';
import styled from 'styled-components';
@@ -55,11 +55,21 @@ const ViewSelectContainer = styled.div`
.ant-select-selection-item {
font-weight: 700;
font-size: 14px;
+ text-align: left;
}
}
}
`;
+const SelectStyled = styled(Select)`
+ min-width: 90px;
+ max-width: 200px;
+`;
+
+type Props = {
+ dropdownStyle?: CSSProperties;
+};
+
/**
* The View Select component allows you to select a View to apply to query on the current page. For example,
* search, recommendations, and browse.
@@ -69,7 +79,7 @@ const ViewSelectContainer = styled.div`
*
* In the event that a user refreshes their browser, the state of the view should be saved as well.
*/
-export const ViewSelect = () => {
+export const ViewSelect = ({ dropdownStyle = {} }: Props) => {
const history = useHistory();
const userContext = useUserContext();
const [isOpen, setIsOpen] = useState(false);
@@ -188,12 +198,11 @@ export const ViewSelect = () => {
return (
-
+
{viewBuilderDisplayState.visible && (
{
ref={clearButtonRef}
onClick={onHandleClickClear}
>
- All Entities
+ View all
);
diff --git a/datahub-web-react/src/app/preview/DefaultPreviewCard.tsx b/datahub-web-react/src/app/preview/DefaultPreviewCard.tsx
index f776082e3f905..36713cfb7ffcf 100644
--- a/datahub-web-react/src/app/preview/DefaultPreviewCard.tsx
+++ b/datahub-web-react/src/app/preview/DefaultPreviewCard.tsx
@@ -296,7 +296,7 @@ export default function DefaultPreviewCard({
{deprecation?.deprecated && (
)}
- {health && health.length > 0 && }
+ {health && health.length > 0 ? : null}
{externalUrl && (
initialValues?.includes(agg?.entity?.urn || ''))?.entity || null
- }
- onModalClose={onCloseModal}
- onOkOverride={(dataProductUrn) => {
- onSelect([dataProductUrn]);
- onCloseModal();
- }}
- />
- );
- }
-
if (filterField === CONTAINER_FILTER_NAME) {
return (
-
+
)}
> $CONNECTION_PROPERTIES_PATH
fi
-cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
-
+# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
+. kafka-ready.sh
############################################################
# Start Topic Creation Logic
diff --git a/docs/advanced/no-code-modeling.md b/docs/advanced/no-code-modeling.md
index e1fadee6d371a..9c8f6761a62bc 100644
--- a/docs/advanced/no-code-modeling.md
+++ b/docs/advanced/no-code-modeling.md
@@ -211,7 +211,7 @@ record ServiceKey {
* Name of the service
*/
@Searchable = {
- "fieldType": "TEXT_PARTIAL",
+ "fieldType": "WORD_GRAM",
"enableAutocomplete": true
}
name: string
diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md
index ef4071f89c585..21d59b777dd7c 100644
--- a/docs/lineage/airflow.md
+++ b/docs/lineage/airflow.md
@@ -62,6 +62,7 @@ lazy_load_plugins = False
| datahub.cluster | prod | name of the airflow cluster |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
+ | datahub.capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
5. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html).
@@ -80,9 +81,7 @@ Emitting DataHub ...
If you have created a custom Airflow operator [docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html) that inherits from the BaseOperator class,
when overriding the `execute` function, set inlets and outlets via `context['ti'].task.inlets` and `context['ti'].task.outlets`.
-The DataHub Airflow plugin will then pick up those inlets and outlets after the task runs.
-
-
+The DataHub Airflow plugin will then pick up those inlets and outlets after the task runs.
```python
class DbtOperator(BaseOperator):
@@ -97,8 +96,8 @@ class DbtOperator(BaseOperator):
def _get_lineage(self):
# Do some processing to get inlets/outlets
-
- return inlets, outlets
+
+ return inlets, outlets
```
If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. [source](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage)
@@ -172,7 +171,6 @@ Take a look at this sample DAG:
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.
-
## Debugging
### Incorrect URLs
diff --git a/docs/modeling/extending-the-metadata-model.md b/docs/modeling/extending-the-metadata-model.md
index 32951ab2e41eb..f47630f44e772 100644
--- a/docs/modeling/extending-the-metadata-model.md
+++ b/docs/modeling/extending-the-metadata-model.md
@@ -323,7 +323,7 @@ It takes the following parameters:
annotations. To customize the set of analyzers used to index a certain field, you must add a new field type and define
the set of mappings to be applied in the MappingsBuilder.
- Thus far, we have implemented 10 fieldTypes:
+ Thus far, we have implemented 11 fieldTypes:
1. *KEYWORD* - Short text fields that only support exact matches, often used only for filtering
@@ -332,20 +332,25 @@ It takes the following parameters:
3. *TEXT_PARTIAL* - Text fields delimited by spaces/slashes/periods with partial matching support. Note, partial
matching is expensive, so this field type should not be applied to fields with long values (like description)
- 4. *BROWSE_PATH* - Field type for browse paths. Applies specific mappings for slash delimited paths.
+ 4. *WORD_GRAM* - Text fields delimited by spaces, slashes, periods, dashes, or underscores with partial matching AND
+ word gram support. That is, the text will be split by the delimiters and can be matched with delimited queries
+ matching two, three, or four length tokens in addition to single tokens. As with partial match, this type is
+ expensive, so should not be applied to fields with long values such as description.
- 5. *URN* - Urn fields where each sub-component inside the urn is indexed. For instance, for a data platform urn like
+ 5. *BROWSE_PATH* - Field type for browse paths. Applies specific mappings for slash delimited paths.
+
+ 6. *URN* - Urn fields where each sub-component inside the urn is indexed. For instance, for a data platform urn like
"urn:li:dataplatform:kafka", it will index the platform name "kafka" and ignore the common components
- 6. *URN_PARTIAL* - Urn fields where each sub-component inside the urn is indexed with partial matching support.
+ 7. *URN_PARTIAL* - Urn fields where each sub-component inside the urn is indexed with partial matching support.
- 7. *BOOLEAN* - Boolean fields used for filtering.
+ 8. *BOOLEAN* - Boolean fields used for filtering.
- 8. *COUNT* - Count fields used for filtering.
+ 9. *COUNT* - Count fields used for filtering.
- 9. *DATETIME* - Datetime fields used to represent timestamps.
+ 10. *DATETIME* - Datetime fields used to represent timestamps.
- 10. *OBJECT* - Each property in an object will become an extra column in Elasticsearch and can be referenced as
+ 11. *OBJECT* - Each property in an object will become an extra column in Elasticsearch and can be referenced as
`field.property` in queries. You should be careful to not use it on objects with many properties as it can cause a
mapping explosion in Elasticsearch.
diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java
index f2e65c771c6eb..3d3fbcf3ccaa6 100644
--- a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java
+++ b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java
@@ -21,7 +21,7 @@ public class SearchableAnnotation {
public static final String ANNOTATION_NAME = "Searchable";
private static final Set DEFAULT_QUERY_FIELD_TYPES =
- ImmutableSet.of(FieldType.TEXT, FieldType.TEXT_PARTIAL, FieldType.URN, FieldType.URN_PARTIAL);
+ ImmutableSet.of(FieldType.TEXT, FieldType.TEXT_PARTIAL, FieldType.WORD_GRAM, FieldType.URN, FieldType.URN_PARTIAL);
// Name of the field in the search index. Defaults to the field name in the schema
String fieldName;
@@ -59,7 +59,8 @@ public enum FieldType {
COUNT,
DATETIME,
OBJECT,
- BROWSE_PATH_V2
+ BROWSE_PATH_V2,
+ WORD_GRAM
}
@Nonnull
diff --git a/entity-registry/src/test/java/com/linkedin/metadata/models/EntitySpecBuilderTest.java b/entity-registry/src/test/java/com/linkedin/metadata/models/EntitySpecBuilderTest.java
index 1ab5ff640ce32..3618108970afa 100644
--- a/entity-registry/src/test/java/com/linkedin/metadata/models/EntitySpecBuilderTest.java
+++ b/entity-registry/src/test/java/com/linkedin/metadata/models/EntitySpecBuilderTest.java
@@ -142,7 +142,7 @@ private void validateTestEntityInfo(final AspectSpec testEntityInfo) {
assertEquals(new TestEntityInfo().schema().getFullName(), testEntityInfo.getPegasusSchema().getFullName());
// Assert on Searchable Fields
- assertEquals(9, testEntityInfo.getSearchableFieldSpecs().size());
+ assertEquals(testEntityInfo.getSearchableFieldSpecs().size(), 10);
assertEquals("customProperties", testEntityInfo.getSearchableFieldSpecMap().get(
new PathSpec("customProperties").toString()).getSearchableAnnotation().getFieldName());
assertEquals(SearchableAnnotation.FieldType.KEYWORD, testEntityInfo.getSearchableFieldSpecMap().get(
@@ -158,6 +158,11 @@ private void validateTestEntityInfo(final AspectSpec testEntityInfo) {
assertEquals(SearchableAnnotation.FieldType.TEXT_PARTIAL, testEntityInfo.getSearchableFieldSpecMap().get(
new PathSpec("textArrayField", "*").toString())
.getSearchableAnnotation().getFieldType());
+ assertEquals("wordGramField", testEntityInfo.getSearchableFieldSpecMap().get(
+ new PathSpec("wordGramField").toString()).getSearchableAnnotation().getFieldName());
+ assertEquals(SearchableAnnotation.FieldType.WORD_GRAM, testEntityInfo.getSearchableFieldSpecMap().get(
+ new PathSpec("wordGramField").toString())
+ .getSearchableAnnotation().getFieldType());
assertEquals("nestedIntegerField", testEntityInfo.getSearchableFieldSpecMap().get(
new PathSpec("nestedRecordField", "nestedIntegerField").toString()).getSearchableAnnotation().getFieldName());
assertEquals(SearchableAnnotation.FieldType.COUNT, testEntityInfo.getSearchableFieldSpecMap().get(
diff --git a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md
index 9d400460407c8..03bcef70e1860 100644
--- a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md
+++ b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md
@@ -1,5 +1,60 @@
## Advanced Configurations
+### Working with Platform Instances
+If you've multiple instances of kafka OR source/sink systems that are referred in your `kafka-connect` setup, you'd need to configure platform instance for these systems in `kafka-connect` recipe to generate correct lineage edges. You must have already set `platform_instance` in recipes of original source/sink systems. Refer the document [Working with Platform Instances](https://datahubproject.io/docs/platform-instances) to understand more about this.
+
+There are two options available to declare source/sink system's `platform_instance` in `kafka-connect` recipe. If single instance of platform is used across all `kafka-connect` connectors, you can use `platform_instance_map` to specify platform_instance to use for a platform when constructing URNs for lineage.
+
+Example:
+```yml
+ # Map of platform name to platform instance
+ platform_instance_map:
+ snowflake: snowflake_platform_instance
+ mysql: mysql_platform_instance
+
+```
+If multiple instances of platform are used across `kafka-connect` connectors, you'd need to specify platform_instance to use for platform for every connector.
+
+#### Example - Multiple MySQL Source Connectors each reading from different mysql instance
+```yml
+ # Map of platform name to platform instance per connector
+ connect_to_platform_map:
+ mysql_connector1:
+ mysql: mysql_instance1
+
+ mysql_connector2:
+ mysql: mysql_instance2
+```
+Here mysql_connector1 and mysql_connector2 are names of MySQL source connectors as defined in `kafka-connect` connector config.
+
+#### Example - Multiple MySQL Source Connectors each reading from difference mysql instance and writing to different kafka cluster
+```yml
+ connect_to_platform_map:
+ mysql_connector1:
+ mysql: mysql_instance1
+ kafka: kafka_instance1
+
+ mysql_connector2:
+ mysql: mysql_instance2
+ kafka: kafka_instance2
+```
+You can also use combination of `platform_instance_map` and `connect_to_platform_map` in your recipe. Note that, the platform_instance specified for the connector in `connect_to_platform_map` will always take higher precedance even if platform_instance for same platform is set in `platform_instance_map`.
+
+If you do not use `platform_instance` in original source/sink recipes, you do not need to specify them in above configurations.
+
+Note that, you do not need to specify platform_instance for BigQuery.
+
+#### Example - Multiple BigQuery Sink Connectors each writing to different kafka cluster
+```yml
+ connect_to_platform_map:
+ bigquery_connector1:
+ kafka: kafka_instance1
+
+ bigquery_connector2:
+ kafka: kafka_instance2
+```
+
+### Provided Configurations from External Sources
Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in `provided_configs` section in recipe for DataHub to generate correct lineage.
```yml
diff --git a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml
index f5e33e661622d..cacbda5ca078a 100644
--- a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml
+++ b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml
@@ -3,14 +3,16 @@ source:
config:
# Coordinates
connect_uri: "http://localhost:8083"
-
+
# Credentials
username: admin
password: password
# Optional
- platform_instance_map:
- bigquery: bigquery_platform_instance_id
-
+ # Platform instance mapping to use when constructing URNs.
+ # Use if single instance of platform is referred across connectors.
+ platform_instance_map:
+ mysql: mysql_platform_instance
+
sink:
- # sink configs
\ No newline at end of file
+ # sink configs
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index 4ff1d06bb8c22..62cb4f1abb8cf 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -454,7 +454,7 @@ def get_long_description():
"mypy==1.0.0",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
- "pydantic>=1.9.0",
+ "pydantic>=1.10.0",
*test_api_requirements,
pytest_dep,
"pytest-asyncio>=0.16.0",
diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py
index 8e313e92cbf84..c943b83a887ed 100644
--- a/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py
+++ b/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py
@@ -435,6 +435,7 @@ def _field_from_complex_type(
field_path._set_parent_type_if_not_exists(
DataHubType(type=MapTypeClass, nested_type=value_type)
)
+ # FIXME: description not set. This is present in schema["description"].
yield from JsonSchemaTranslator.get_fields(
JsonSchemaTranslator._get_type_from_schema(
schema["additionalProperties"]
diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
index d1f39a3ba1ba6..7725d63ce0e1e 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
@@ -129,6 +129,7 @@
# Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
SNAPSHOT_TABLE_REGEX = re.compile(r"^(.+)@(\d{13})$")
+CLUSTERING_COLUMN_TAG = "CLUSTERING_COLUMN"
# We can't use close as it is not called if the ingestion is not successful
@@ -1151,6 +1152,21 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
field.description = col.comment
schema_fields[idx] = field
else:
+ tags = []
+ if col.is_partition_column:
+ tags.append(
+ TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
+ )
+
+ if col.cluster_column_position is not None:
+ tags.append(
+ TagAssociationClass(
+ make_tag_urn(
+ f"{CLUSTERING_COLUMN_TAG}_{col.cluster_column_position}"
+ )
+ )
+ )
+
field = SchemaField(
fieldPath=col.name,
type=SchemaFieldDataType(
@@ -1160,15 +1176,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
nativeDataType=col.data_type,
description=col.comment,
nullable=col.is_nullable,
- globalTags=GlobalTagsClass(
- tags=[
- TagAssociationClass(
- make_tag_urn(Constants.TAG_PARTITION_KEY)
- )
- ]
- )
- if col.is_partition_column
- else GlobalTagsClass(tags=[]),
+ globalTags=GlobalTagsClass(tags=tags),
)
schema_fields.append(field)
last_id = col.ordinal_position
diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py
index 2450dbd0e2391..f8256f8e6fed6 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py
@@ -33,6 +33,7 @@ class BigqueryTableType:
class BigqueryColumn(BaseColumn):
field_path: str
is_partition_column: bool
+ cluster_column_position: Optional[int]
RANGE_PARTITION_NAME: str = "RANGE"
@@ -285,7 +286,8 @@ class BigqueryQuery:
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
description as comment,
c.is_hidden as is_hidden,
- c.is_partitioning_column as is_partitioning_column
+ c.is_partitioning_column as is_partitioning_column,
+ c.clustering_ordinal_position as clustering_ordinal_position,
from
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c
join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name
@@ -307,6 +309,7 @@ class BigqueryQuery:
description as comment,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column,
+ c.clustering_ordinal_position as clustering_ordinal_position,
-- We count the columns to be able limit it later
row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num,
-- Getting the maximum shard for each table
@@ -333,6 +336,7 @@ class BigqueryQuery:
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column,
+ c.clustering_ordinal_position as clustering_ordinal_position,
description as comment
from
`{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c
@@ -583,6 +587,7 @@ def get_columns_for_dataset(
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
+ cluster_column_position=column.clustering_ordinal_position,
)
)
@@ -621,6 +626,7 @@ def get_columns_for_table(
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
+ cluster_column_position=column.clustering_ordinal_position,
)
)
last_seen_table = column.table_name
diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
index 1cd5ed8164854..af9769bc9d94c 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
@@ -162,9 +162,11 @@ class DBTCloudConfig(DBTCommonConfig):
}
_DBT_GRAPHQL_QUERY = """
-query DatahubMetadataQuery_{type}($jobId: Int!, $runId: Int) {{
- {type}(jobId: $jobId, runId: $runId) {{
+query DatahubMetadataQuery_{type}($jobId: BigInt!, $runId: BigInt) {{
+ job(id: $jobId, runId: $runId) {{
+ {type} {{
{fields}
+ }}
}}
}}
"""
@@ -218,7 +220,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
},
)
- raw_nodes.extend(data[node_type])
+ raw_nodes.extend(data["job"][node_type])
nodes = [self._parse_into_dbt_node(node) for node in raw_nodes]
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
index 31d067f984d2d..ffa685fb25826 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
@@ -121,6 +121,12 @@ class DataPlatformPair:
powerbi_data_platform_name: str
+@dataclass
+class PowerBIPlatformDetail:
+ data_platform_pair: DataPlatformPair
+ data_platform_server: str
+
+
class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
@@ -382,6 +388,15 @@ class PowerBiDashboardSourceConfig(
description="The instance of the platform that all assets produced by this recipe belong to",
)
+ # Enable advance sql construct
+ enable_advance_lineage_sql_construct: bool = pydantic.Field(
+ default=False,
+ description="Whether to enable advance native sql construct for parsing like join, sub-queries. "
+ "along this flag , the native_query_parsing should be enabled. "
+ "By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage "
+ "as this option generates the upstream datasets URN in lowercase.",
+ )
+
@validator("dataset_type_mapping")
@classmethod
def map_data_platform(cls, value):
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py
index 396da2d79e3b7..baaa8d5b85ae1 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py
@@ -5,8 +5,8 @@
from datahub.ingestion.source.powerbi.config import (
PlatformDetail,
PowerBiDashboardSourceConfig,
+ PowerBIPlatformDetail,
)
-from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable
logger = logging.getLogger(__name__)
@@ -14,7 +14,7 @@
class AbstractDataPlatformInstanceResolver(ABC):
@abstractmethod
def get_platform_instance(
- self, dataplatform_table: DataPlatformTable
+ self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
pass
@@ -32,10 +32,10 @@ class ResolvePlatformInstanceFromDatasetTypeMapping(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
- self, dataplatform_table: DataPlatformTable
+ self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[
- dataplatform_table.data_platform_pair.powerbi_data_platform_name
+ data_platform_detail.data_platform_pair.powerbi_data_platform_name
]
if isinstance(platform, PlatformDetail):
@@ -48,13 +48,13 @@ class ResolvePlatformInstanceFromServerToPlatformInstance(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
- self, dataplatform_table: DataPlatformTable
+ self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
return (
self.config.server_to_platform_instance[
- dataplatform_table.datasource_server
+ data_platform_detail.data_platform_server
]
- if dataplatform_table.datasource_server
+ if data_platform_detail.data_platform_server
in self.config.server_to_platform_instance
else PlatformDetail.parse_obj({})
)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py
index 640bc4bd60d80..021c429c3c633 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py
@@ -1,8 +1,12 @@
import logging
-from typing import List
+from typing import List, Optional
import sqlparse
+import datahub.utilities.sqlglot_lineage as sqlglot_l
+from datahub.ingestion.api.common import PipelineContext
+from datahub.utilities.sqlglot_lineage import SqlParsingResult
+
SPECIAL_CHARACTERS = ["#(lf)", "(lf)"]
logger = logging.getLogger()
@@ -45,3 +49,30 @@ def get_tables(native_query: str) -> List[str]:
from_index = from_index + 1
return tables
+
+
+def parse_custom_sql(
+ ctx: PipelineContext,
+ query: str,
+ schema: Optional[str],
+ database: Optional[str],
+ platform: str,
+ env: str,
+ platform_instance: Optional[str],
+) -> Optional["SqlParsingResult"]:
+
+ logger.debug("Using sqlglot_lineage to parse custom sql")
+
+ sql_query = remove_special_characters(query)
+
+ logger.debug(f"Parsing sql={sql_query}")
+
+ return sqlglot_l.create_lineage_sql_parsed_result(
+ query=sql_query,
+ schema=schema,
+ database=database,
+ platform=platform,
+ platform_instance=platform_instance,
+ env=env,
+ graph=ctx.graph,
+ )
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py
index 83106c04529d1..8cc38c366c42a 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py
@@ -6,7 +6,14 @@
import lark
from lark import Lark, Tree
-from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
+from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.source.powerbi.config import (
+ PowerBiDashboardSourceConfig,
+ PowerBiDashboardSourceReport,
+)
+from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
+ AbstractDataPlatformInstanceResolver,
+)
from datahub.ingestion.source.powerbi.m_query import resolver, validator
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
@@ -45,7 +52,9 @@ def _parse_expression(expression: str) -> Tree:
def get_upstream_tables(
table: Table,
reporter: PowerBiDashboardSourceReport,
- native_query_enabled: bool = True,
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver,
+ ctx: PipelineContext,
+ config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {},
) -> List[resolver.DataPlatformTable]:
if table.expression is None:
@@ -58,7 +67,7 @@ def get_upstream_tables(
parse_tree: Tree = _parse_expression(table.expression)
valid, message = validator.validate_parse_tree(
- parse_tree, native_query_enabled=native_query_enabled
+ parse_tree, native_query_enabled=config.native_query_parsing
)
if valid is False:
assert message is not None
@@ -84,7 +93,11 @@ def get_upstream_tables(
parse_tree=parse_tree,
reporter=reporter,
parameters=parameters,
- ).resolve_to_data_platform_table_list()
+ ).resolve_to_data_platform_table_list(
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
+ )
except BaseException as e:
reporter.report_warning(table.full_name, "Failed to process m-query expression")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py
index e2b448124c89d..479f1decff903 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py
@@ -6,11 +6,19 @@
from lark import Tree
+import datahub.emitter.mce_builder as builder
+from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
DataPlatformPair,
+ PlatformDetail,
+ PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
+ PowerBIPlatformDetail,
SupportedDataPlatform,
)
+from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
+ AbstractDataPlatformInstanceResolver,
+)
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
@@ -19,19 +27,98 @@
IdentifierAccessor,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
+from datahub.utilities.sqlglot_lineage import SqlParsingResult
logger = logging.getLogger(__name__)
@dataclass
class DataPlatformTable:
- name: str
- full_name: str
- datasource_server: str
data_platform_pair: DataPlatformPair
+ urn: str
+
+
+def urn_to_lowercase(value: str, flag: bool) -> str:
+ if flag is True:
+ return value.lower()
+
+ return value
+
+
+def urn_creator(
+ config: PowerBiDashboardSourceConfig,
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver,
+ data_platform_pair: DataPlatformPair,
+ server: str,
+ qualified_table_name: str,
+) -> str:
+
+ platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance(
+ PowerBIPlatformDetail(
+ data_platform_pair=data_platform_pair,
+ data_platform_server=server,
+ )
+ )
+
+ return builder.make_dataset_urn_with_platform_instance(
+ platform=data_platform_pair.datahub_data_platform_name,
+ platform_instance=platform_detail.platform_instance,
+ env=platform_detail.env,
+ name=urn_to_lowercase(
+ qualified_table_name, config.convert_lineage_urns_to_lowercase
+ ),
+ )
class AbstractDataPlatformTableCreator(ABC):
+ """
+ Base class to share common functionalities among different dataplatform for M-Query parsing.
+
+ To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and
+ the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example see below M-Query.
+
+ let
+ Source = Sql.Database("localhost", "library"),
+ dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]
+ in
+ dbo_book_issue
+
+ It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in second argument
+ of first statement and schema-name and table-name is available in second statement. second statement can be repeated to access different tables from MSSQL.
+
+ DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern
+
+ data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to
+ find out database-name , schema-name and table-name also varies as per dataplatform.
+
+ Value.NativeQuery is one of the function which is used to execute native query inside M-Query, for example see below M-Query
+
+ let
+ Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true])
+ in
+ Source
+
+ In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query.
+
+ NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing.
+
+ """
+
+ ctx: PipelineContext
+ config: PowerBiDashboardSourceConfig
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver
+
+ def __init__(
+ self,
+ ctx: PipelineContext,
+ config: PowerBiDashboardSourceConfig,
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver,
+ ) -> None:
+ super().__init__()
+ self.ctx = ctx
+ self.config = config
+ self.platform_instance_resolver = platform_instance_resolver
+
@abstractmethod
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
@@ -58,6 +145,49 @@ def get_db_detail_from_argument(
return arguments[0], arguments[1]
+ def parse_custom_sql(
+ self, query: str, server: str, database: Optional[str], schema: Optional[str]
+ ) -> List[DataPlatformTable]:
+
+ dataplatform_tables: List[DataPlatformTable] = []
+
+ platform_detail: PlatformDetail = (
+ self.platform_instance_resolver.get_platform_instance(
+ PowerBIPlatformDetail(
+ data_platform_pair=self.get_platform_pair(),
+ data_platform_server=server,
+ )
+ )
+ )
+
+ parsed_result: Optional[
+ "SqlParsingResult"
+ ] = native_sql_parser.parse_custom_sql(
+ ctx=self.ctx,
+ query=query,
+ platform=self.get_platform_pair().datahub_data_platform_name,
+ platform_instance=platform_detail.platform_instance,
+ env=platform_detail.env,
+ database=database,
+ schema=schema,
+ )
+
+ if parsed_result is None:
+ logger.debug("Failed to parse query")
+ return dataplatform_tables
+
+ for urn in parsed_result.in_tables:
+ dataplatform_tables.append(
+ DataPlatformTable(
+ data_platform_pair=self.get_platform_pair(),
+ urn=urn,
+ )
+ )
+
+ logger.debug(f"Generated dataplatform_tables={dataplatform_tables}")
+
+ return dataplatform_tables
+
class AbstractDataAccessMQueryResolver(ABC):
table: Table
@@ -80,11 +210,29 @@ def __init__(
self.data_access_functions = SupportedResolver.get_function_names()
@abstractmethod
- def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]:
+ def resolve_to_data_platform_table_list(
+ self,
+ ctx: PipelineContext,
+ config: PowerBiDashboardSourceConfig,
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver,
+ ) -> List[DataPlatformTable]:
pass
class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
+ """
+ This class parses the M-Query recursively to generate DataAccessFunctionDetail (see method create_data_access_functional_detail).
+
+ This class has generic code to process M-Query tokens and create instance of DataAccessFunctionDetail.
+
+ Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator
+ (see method resolve_to_data_platform_table_list).
+
+ Classes which extended from AbstractDataPlatformTableCreator knows how to convert generated DataAccessFunctionDetail instance
+ to respective DataPlatformTable instance as per dataplatform.
+
+ """
+
def get_item_selector_tokens(
self,
expression_tree: Tree,
@@ -318,9 +466,15 @@ def internal(
return table_links
- def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]:
+ def resolve_to_data_platform_table_list(
+ self,
+ ctx: PipelineContext,
+ config: PowerBiDashboardSourceConfig,
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver,
+ ) -> List[DataPlatformTable]:
data_platform_tables: List[DataPlatformTable] = []
+ # Find out output variable as we are doing backtracking in M-Query
output_variable: Optional[str] = tree_function.get_output_variable(
self.parse_tree
)
@@ -332,12 +486,14 @@ def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]:
)
return data_platform_tables
+ # Parse M-Query and use output_variable as root of tree and create instance of DataAccessFunctionDetail
table_links: List[
DataAccessFunctionDetail
] = self.create_data_access_functional_detail(output_variable)
# Each item is data-access function
for f_detail in table_links:
+ # Get & Check if we support data-access-function available in M-Query
supported_resolver = SupportedResolver.get_resolver(
f_detail.data_access_function_name
)
@@ -351,8 +507,14 @@ def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]:
)
continue
+ # From supported_resolver enum get respective resolver like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it
+ # & also pass additional information that will be need to generate urn
table_full_name_creator: AbstractDataPlatformTableCreator = (
- supported_resolver.get_table_full_name_creator()()
+ supported_resolver.get_table_full_name_creator()(
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
+ )
)
data_platform_tables.extend(
@@ -393,18 +555,24 @@ def two_level_access_pattern(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Item"]
- full_table_name: str = f"{db_name}.{schema_name}.{table_name}"
+ qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug(
- f"Platform({self.get_platform_pair().datahub_data_platform_name}) full_table_name= {full_table_name}"
+ f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}"
+ )
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
)
return [
DataPlatformTable(
- name=table_name,
- full_name=full_table_name,
- datasource_server=server,
data_platform_pair=self.get_platform_pair(),
+ urn=urn,
)
]
@@ -420,9 +588,48 @@ def get_platform_pair(self) -> DataPlatformPair:
class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
+ # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
+ DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo
+
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.MS_SQL.value
+ def create_urn_using_old_parser(
+ self, query: str, db_name: str, server: str
+ ) -> List[DataPlatformTable]:
+ dataplatform_tables: List[DataPlatformTable] = []
+
+ tables: List[str] = native_sql_parser.get_tables(query)
+
+ for table in tables:
+ schema_and_table: List[str] = table.split(".")
+ if len(schema_and_table) == 1:
+ # schema name is not present. set default schema
+ schema_and_table.insert(0, MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA)
+
+ qualified_table_name = (
+ f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}"
+ )
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
+ )
+
+ dataplatform_tables.append(
+ DataPlatformTable(
+ data_platform_pair=self.get_platform_pair(),
+ urn=urn,
+ )
+ )
+
+ logger.debug(f"Generated upstream tables = {dataplatform_tables}")
+
+ return dataplatform_tables
+
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[DataPlatformTable]:
@@ -442,28 +649,20 @@ def create_dataplatform_tables(
logger.debug("Unsupported case is found. Second index is not the Query")
return dataplatform_tables
- db_name: str = arguments[1]
-
- tables: List[str] = native_sql_parser.get_tables(arguments[3])
- for table in tables:
- schema_and_table: List[str] = table.split(".")
- if len(schema_and_table) == 1:
- # schema name is not present. Default schema name in MS-SQL is dbo
- # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
- schema_and_table.insert(0, "dbo")
-
- dataplatform_tables.append(
- DataPlatformTable(
- name=schema_and_table[1],
- full_name=f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}",
- datasource_server=arguments[0],
- data_platform_pair=self.get_platform_pair(),
- )
+ if self.config.enable_advance_lineage_sql_construct is False:
+ # Use previous parser to generate URN to keep backward compatibility
+ return self.create_urn_using_old_parser(
+ query=arguments[3],
+ db_name=arguments[1],
+ server=arguments[0],
)
- logger.debug("MS-SQL full-table-names %s", dataplatform_tables)
-
- return dataplatform_tables
+ return self.parse_custom_sql(
+ query=arguments[3],
+ database=arguments[1],
+ server=arguments[0],
+ schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
+ )
class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
@@ -510,12 +709,20 @@ def create_dataplatform_tables(
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
+ qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
+ )
+
return [
DataPlatformTable(
- name=table_name,
- full_name=f"{db_name}.{schema_name}.{table_name}",
- datasource_server=server,
data_platform_pair=self.get_platform_pair(),
+ urn=urn,
)
]
@@ -547,14 +754,28 @@ def create_dataplatform_tables(
db_name: str = value_dict["Database"]
schema_name: str = value_dict["Schema"]
table_name: str = value_dict["Table"]
+
+ qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
+
server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list)
+ if server is None:
+ logger.info(
+ f"server information is not available for {qualified_table_name}. Skipping upstream table"
+ )
+ return []
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
+ )
return [
DataPlatformTable(
- name=table_name,
- full_name=f"{db_name}.{schema_name}.{table_name}",
- datasource_server=server if server else "",
data_platform_pair=self.get_platform_pair(),
+ urn=urn,
)
]
@@ -589,20 +810,26 @@ def create_dataplatform_tables(
IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore
).items["Name"]
- full_table_name: str = f"{db_name}.{schema_name}.{table_name}"
+ qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug(
- f"{self.get_platform_pair().datahub_data_platform_name} full-table-name {full_table_name}"
+ f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}"
+ )
+
+ server: str = self.get_datasource_server(arguments, data_access_func_detail)
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
)
return [
DataPlatformTable(
- name=table_name,
- full_name=full_table_name,
- datasource_server=self.get_datasource_server(
- arguments, data_access_func_detail
- ),
data_platform_pair=self.get_platform_pair(),
+ urn=urn,
)
]
@@ -654,12 +881,20 @@ def create_dataplatform_tables(
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
+ qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
+ )
+
return [
DataPlatformTable(
- name=table_name,
- full_name=f"{db_name}.{schema_name}.{table_name}",
- datasource_server=server,
data_platform_pair=self.get_platform_pair(),
+ urn=urn,
)
]
@@ -681,6 +916,39 @@ def is_native_parsing_supported(data_access_function_name: str) -> bool:
in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
)
+ def create_urn_using_old_parser(
+ self, query: str, server: str
+ ) -> List[DataPlatformTable]:
+ dataplatform_tables: List[DataPlatformTable] = []
+
+ tables: List[str] = native_sql_parser.get_tables(query)
+
+ for qualified_table_name in tables:
+ if len(qualified_table_name.split(".")) != 3:
+ logger.debug(
+ f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format"
+ )
+ continue
+
+ urn = urn_creator(
+ config=self.config,
+ platform_instance_resolver=self.platform_instance_resolver,
+ data_platform_pair=self.get_platform_pair(),
+ server=server,
+ qualified_table_name=qualified_table_name,
+ )
+
+ dataplatform_tables.append(
+ DataPlatformTable(
+ data_platform_pair=self.get_platform_pair(),
+ urn=urn,
+ )
+ )
+
+ logger.debug(f"Generated dataplatform_tables {dataplatform_tables}")
+
+ return dataplatform_tables
+
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[DataPlatformTable]:
@@ -727,25 +995,21 @@ def create_dataplatform_tables(
0
] # Remove any whitespaces and double quotes character
- for table in native_sql_parser.get_tables(sql_query):
- if len(table.split(".")) != 3:
- logger.debug(
- f"Skipping table {table} as it is not as per full_table_name format"
- )
- continue
+ server = tree_function.strip_char_from_list([data_access_tokens[2]])[0]
- dataplatform_tables.append(
- DataPlatformTable(
- name=table.split(".")[2],
- full_name=table,
- datasource_server=tree_function.strip_char_from_list(
- [data_access_tokens[2]]
- )[0],
- data_platform_pair=self.get_platform_pair(),
- )
+ if self.config.enable_advance_lineage_sql_construct is False:
+ # Use previous parser to generate URN to keep backward compatibility
+ return self.create_urn_using_old_parser(
+ query=sql_query,
+ server=server,
)
- return dataplatform_tables
+ return self.parse_custom_sql(
+ query=sql_query,
+ server=server,
+ database=None, # database and schema is available inside custom sql as per PowerBI Behavior
+ schema=None,
+ )
class FunctionName(Enum):
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
index 919cb83e4d832..5d477ee090e7e 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
@@ -28,7 +28,6 @@
)
from datahub.ingestion.source.powerbi.config import (
Constant,
- PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
@@ -96,10 +95,12 @@ def __hash__(self):
def __init__(
self,
+ ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
reporter: PowerBiDashboardSourceReport,
dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver,
):
+ self.__ctx = ctx
self.__config = config
self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver
@@ -172,43 +173,40 @@ def extract_lineage(
# table.dataset should always be set, but we check it just in case.
parameters = table.dataset.parameters if table.dataset else {}
- upstreams: List[UpstreamClass] = []
- upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
- table, self.__reporter, parameters=parameters
+ upstream: List[UpstreamClass] = []
+
+ upstream_dpts: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
+ table=table,
+ reporter=self.__reporter,
+ platform_instance_resolver=self.__dataplatform_instance_resolver,
+ ctx=self.__ctx,
+ config=self.__config,
+ parameters=parameters,
)
+
logger.debug(
- f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_tables}"
+ f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_dpts}"
)
- for upstream_table in upstream_tables:
+
+ for upstream_dpt in upstream_dpts:
if (
- upstream_table.data_platform_pair.powerbi_data_platform_name
+ upstream_dpt.data_platform_pair.powerbi_data_platform_name
not in self.__config.dataset_type_mapping.keys()
):
logger.debug(
- f"Skipping upstream table for {ds_urn}. The platform {upstream_table.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping",
+ f"Skipping upstream table for {ds_urn}. The platform {upstream_dpt.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping",
)
continue
- platform_detail: PlatformDetail = (
- self.__dataplatform_instance_resolver.get_platform_instance(
- upstream_table
- )
- )
- upstream_urn = builder.make_dataset_urn_with_platform_instance(
- platform=upstream_table.data_platform_pair.datahub_data_platform_name,
- platform_instance=platform_detail.platform_instance,
- env=platform_detail.env,
- name=self.lineage_urn_to_lowercase(upstream_table.full_name),
- )
-
upstream_table_class = UpstreamClass(
- upstream_urn,
+ upstream_dpt.urn,
DatasetLineageTypeClass.TRANSFORMED,
)
- upstreams.append(upstream_table_class)
- if len(upstreams) > 0:
- upstream_lineage = UpstreamLineageClass(upstreams=upstreams)
+ upstream.append(upstream_table_class)
+
+ if len(upstream) > 0:
+ upstream_lineage = UpstreamLineageClass(upstreams=upstream)
logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}")
mcp = MetadataChangeProposalWrapper(
entityType=Constant.DATASET,
@@ -1107,7 +1105,9 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext):
) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising
# unwanted stacktrace on console
- self.mapper = Mapper(config, self.reporter, self.dataplatform_instance_resolver)
+ self.mapper = Mapper(
+ ctx, config, self.reporter, self.dataplatform_instance_resolver
+ )
# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
index 039eac1e93819..587c71a98be67 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
@@ -1,5 +1,6 @@
from typing import List, Optional
+from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import DEFAULT_TABLES_DENY_LIST
@@ -575,14 +576,17 @@ def get_access_history_date_range() -> str:
def usage_per_object_per_time_bucket_for_time_window(
start_time_millis: int,
end_time_millis: int,
- time_bucket_size: str,
+ time_bucket_size: BucketDuration,
use_base_objects: bool,
top_n_queries: int,
include_top_n_queries: bool,
) -> str:
if not include_top_n_queries:
top_n_queries = 0
- assert time_bucket_size == "DAY" or time_bucket_size == "HOUR"
+ assert (
+ time_bucket_size == BucketDuration.DAY
+ or time_bucket_size == BucketDuration.HOUR
+ )
objects_column = (
"BASE_OBJECTS_ACCESSED" if use_base_objects else "DIRECT_OBJECTS_ACCESSED"
)
@@ -629,7 +633,7 @@ def usage_per_object_per_time_bucket_for_time_window(
SELECT
object_name,
ANY_VALUE(object_domain) AS object_domain,
- DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
+ DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
count( distinct(user_name) ) AS total_users
FROM
@@ -644,7 +648,7 @@ def usage_per_object_per_time_bucket_for_time_window(
SELECT
object_name,
column_name,
- DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
+ DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries
FROM
field_access_history
@@ -658,7 +662,7 @@ def usage_per_object_per_time_bucket_for_time_window(
(
SELECT
object_name,
- DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
+ DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
user_name,
ANY_VALUE(users.email) AS user_email
@@ -677,7 +681,7 @@ def usage_per_object_per_time_bucket_for_time_window(
(
SELECT
object_name,
- DATE_TRUNC('{time_bucket_size}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
+ DATE_TRUNC('{time_bucket_size.value}', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
query_history.query_text AS query_text,
count(distinct(access_history.query_id)) AS total_queries
FROM
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py
index 3605205b6055c..f8dfa612952d8 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py
@@ -356,7 +356,6 @@ def _check_usage_date_ranges(self) -> Any:
def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
-
if event.query_start_time and event.query_type:
start_time = event.query_start_time
query_type = event.query_type
diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
index 6752bdf519830..ec0af37089b1d 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
@@ -31,6 +31,7 @@
from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError
import datahub.emitter.mce_builder as builder
+import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
@@ -136,12 +137,7 @@
ViewPropertiesClass,
)
from datahub.utilities import config_clean
-from datahub.utilities.sqlglot_lineage import (
- ColumnLineageInfo,
- SchemaResolver,
- SqlParsingResult,
- sqlglot_lineage,
-)
+from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
logger: logging.Logger = logging.getLogger(__name__)
@@ -1585,42 +1581,14 @@ def parse_custom_sql(
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
)
- parsed_result: Optional["SqlParsingResult"] = None
- try:
- schema_resolver = (
- self.ctx.graph._make_schema_resolver(
- platform=platform,
- platform_instance=platform_instance,
- env=env,
- )
- if self.ctx.graph is not None
- else SchemaResolver(
- platform=platform,
- platform_instance=platform_instance,
- env=env,
- graph=None,
- )
- )
-
- if schema_resolver.graph is None:
- logger.warning(
- "Column Level Lineage extraction would not work as DataHub graph client is None."
- )
-
- parsed_result = sqlglot_lineage(
- query,
- schema_resolver=schema_resolver,
- default_db=upstream_db,
- )
- except Exception as e:
- self.report.report_warning(
- key="csql-lineage",
- reason=f"Unable to retrieve lineage from query. "
- f"Query: {query} "
- f"Reason: {str(e)} ",
- )
-
- return parsed_result
+ return sqlglot_l.create_lineage_sql_parsed_result(
+ query=query,
+ database=upstream_db,
+ platform=platform,
+ platform_instance=platform_instance,
+ env=env,
+ graph=self.ctx.graph,
+ )
def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
index d5da93c7be35e..49f56b46fb012 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
@@ -176,10 +176,8 @@ def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInf
for table in runner.target_tables
],
)
- except Exception:
- logger.info(
- f"Could not parse query via lineage runner, {query}", exc_info=True
- )
+ except Exception as e:
+ logger.info(f"Could not parse query via lineage runner, {query}: {e!r}")
return None
@staticmethod
@@ -202,8 +200,8 @@ def _parse_query_via_spark_sql_plan(self, query: str) -> Optional[StringTableInf
return GenericTableInfo(
source_tables=[t for t in tables if t], target_tables=[]
)
- except Exception:
- logger.info(f"Could not parse query via spark plan, {query}", exc_info=True)
+ except Exception as e:
+ logger.info(f"Could not parse query via spark plan, {query}: {e!r}")
return None
@staticmethod
diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
index e5a9954802019..6d028c4ac1b9e 100644
--- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
+++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
@@ -825,3 +825,43 @@ def sqlglot_lineage(
table_error=e,
),
)
+
+
+def create_lineage_sql_parsed_result(
+ query: str,
+ database: Optional[str],
+ platform: str,
+ platform_instance: Optional[str],
+ env: str,
+ schema: Optional[str] = None,
+ graph: Optional[DataHubGraph] = None,
+) -> Optional["SqlParsingResult"]:
+
+ parsed_result: Optional["SqlParsingResult"] = None
+ try:
+ schema_resolver = (
+ graph._make_schema_resolver(
+ platform=platform,
+ platform_instance=platform_instance,
+ env=env,
+ )
+ if graph is not None
+ else SchemaResolver(
+ platform=platform,
+ platform_instance=platform_instance,
+ env=env,
+ graph=None,
+ )
+ )
+
+ parsed_result = sqlglot_lineage(
+ query,
+ schema_resolver=schema_resolver,
+ default_db=database,
+ default_schema=schema,
+ )
+ except Exception as e:
+ logger.debug(f"Fail to prase query {query}", exc_info=e)
+ logger.warning("Fail to parse custom SQL")
+
+ return parsed_result
diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py
index 5c9553402a8c4..e77a12aa4088e 100644
--- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py
+++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py
@@ -1,17 +1,22 @@
import logging
import sys
-from typing import List
+from typing import List, Tuple
import pytest
from lark import Tree
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
-from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
-from datahub.ingestion.source.powerbi.m_query import parser, tree_function
-from datahub.ingestion.source.powerbi.m_query.resolver import (
- DataPlatformTable,
- SupportedDataPlatform,
+from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.source.powerbi.config import (
+ PowerBiDashboardSourceConfig,
+ PowerBiDashboardSourceReport,
+)
+from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
+ AbstractDataPlatformInstanceResolver,
+ create_dataplatform_instance_resolver,
)
+from datahub.ingestion.source.powerbi.m_query import parser, tree_function
+from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable
M_QUERIES = [
'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table',
@@ -38,9 +43,31 @@
'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1',
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"',
+ "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
]
+def get_default_instances(
+ override_config: dict = {},
+) -> Tuple[
+ PipelineContext, PowerBiDashboardSourceConfig, AbstractDataPlatformInstanceResolver
+]:
+ config: PowerBiDashboardSourceConfig = PowerBiDashboardSourceConfig.parse_obj(
+ {
+ "tenant_id": "fake",
+ "client_id": "foo",
+ "client_secret": "bar",
+ **override_config,
+ }
+ )
+
+ platform_instance_resolver: AbstractDataPlatformInstanceResolver = (
+ create_dataplatform_instance_resolver(config)
+ )
+
+ return PipelineContext(run_id="fake"), config, platform_instance_resolver
+
+
@pytest.mark.integration
def test_parse_m_query1():
expression: str = M_QUERIES[0]
@@ -145,20 +172,20 @@ def test_snowflake_regular_case():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == "TESTTABLE"
- assert data_platform_tables[0].full_name == "PBI_TEST.TEST.TESTTABLE"
assert (
- data_platform_tables[0].datasource_server
- == "bu10758.ap-unknown-2.fakecomputing.com"
- )
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,pbi_test.test.testtable,PROD)"
)
@@ -174,17 +201,21 @@ def test_postgres_regular_case():
)
reporter = PowerBiDashboardSourceReport()
+
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == "order_date"
- assert data_platform_tables[0].full_name == "mics.public.order_date"
- assert data_platform_tables[0].datasource_server == "localhost"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)"
)
@@ -200,19 +231,21 @@ def test_databricks_regular_case():
)
reporter = PowerBiDashboardSourceReport()
+
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == "public_consumer_price_index"
assert (
- data_platform_tables[0].full_name
- == "hive_metastore.sandbox_revenue.public_consumer_price_index"
- )
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.DATABRICK_SQL.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.sandbox_revenue.public_consumer_price_index,PROD)"
)
@@ -228,17 +261,21 @@ def test_oracle_regular_case():
)
reporter = PowerBiDashboardSourceReport()
+
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == "EMPLOYEES"
- assert data_platform_tables[0].full_name == "salesdb.HR.EMPLOYEES"
- assert data_platform_tables[0].datasource_server == "localhost:1521"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.ORACLE.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:oracle,salesdb.hr.employees,PROD)"
)
@@ -255,17 +292,20 @@ def test_mssql_regular_case():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == "book_issue"
- assert data_platform_tables[0].full_name == "library.dbo.book_issue"
- assert data_platform_tables[0].datasource_server == "localhost"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:mssql,library.dbo.book_issue,PROD)"
)
@@ -280,14 +320,16 @@ def test_mssql_with_query():
M_QUERIES[11],
]
expected_tables = [
- "COMMOPSDB.dbo.V_OIP_ENT_2022",
- "COMMOPSDB.dbo.V_INVOICE_BOOKING_2022",
- "COMMOPSDB.dbo.V_ARR_ADDS",
- "COMMOPSDB.dbo.V_PS_CD_RETENTION",
- "COMMOPSDB.dbo.V_TPV_LEADERBOARD",
- "COMMOPSDB.dbo.V_ENTERPRISE_INVOICED_REVENUE",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_oip_ent_2022,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_invoice_booking_2022,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_arr_adds,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_tpv_leaderboard,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_enterprise_invoiced_revenue,PROD)",
]
+ ctx, config, platform_instance_resolver = get_default_instances()
+
for index, query in enumerate(mssql_queries):
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
@@ -299,17 +341,15 @@ def test_mssql_with_query():
reporter = PowerBiDashboardSourceReport()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=False
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == expected_tables[index].split(".")[2]
- assert data_platform_tables[0].full_name == expected_tables[index]
- assert data_platform_tables[0].datasource_server == "AUPRDWHDB"
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name
- )
+ assert data_platform_tables[0].urn == expected_tables[index]
@pytest.mark.integration
@@ -322,12 +362,14 @@ def test_snowflake_native_query():
]
expected_tables = [
- "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4",
- "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS",
- "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS",
- "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS",
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_aps_sme_units_v4,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
]
+ ctx, config, platform_instance_resolver = get_default_instances()
+
for index, query in enumerate(snowflake_queries):
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
@@ -339,20 +381,15 @@ def test_snowflake_native_query():
reporter = PowerBiDashboardSourceReport()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == expected_tables[index].split(".")[2]
- assert data_platform_tables[0].full_name == expected_tables[index]
- assert (
- data_platform_tables[0].datasource_server
- == "bu10758.ap-unknown-2.fakecomputing.com"
- )
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
- )
+ assert data_platform_tables[0].urn == expected_tables[index]
def test_google_bigquery_1():
@@ -363,16 +400,20 @@ def test_google_bigquery_1():
)
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=False
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
+
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].full_name == table.full_name
- assert data_platform_tables[0].datasource_server == "seraphic-music-344307"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:bigquery,seraphic-music-344307.school_dataset.first,PROD)"
)
@@ -387,23 +428,24 @@ def test_google_bigquery_2():
)
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
- native_query_enabled=False,
parameters={
"Parameter - Source": "my-test-project",
"My bq project": "gcp_billing",
},
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].full_name == table.full_name
- assert data_platform_tables[0].datasource_server == "my-test-project"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.gcp_billing.gcp_table,PROD)"
)
@@ -416,23 +458,24 @@ def test_for_each_expression_1():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
- native_query_enabled=False,
parameters={
"Parameter - Source": "my-test-project",
"My bq project": "gcp_billing",
},
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].datasource_server == "my-test-project"
- assert data_platform_tables[0].full_name == table.full_name
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.universal.d_wh_date,PROD)"
)
@@ -445,22 +488,23 @@ def test_for_each_expression_2():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
- native_query_enabled=False,
parameters={
"dwh-prod": "originally-not-a-variable-ref-and-not-resolved",
},
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].full_name == table.full_name
- assert data_platform_tables[0].datasource_server == "dwh-prod"
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:bigquery,dwh-prod.gcp_billing.d_gcp_custom_label,PROD)"
)
@@ -476,8 +520,14 @@ def test_native_query_disabled():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+ config.native_query_parsing = False
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=False
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 0
@@ -493,26 +543,25 @@ def test_multi_source_table():
)
reporter = PowerBiDashboardSourceReport()
+
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=False
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 2
- assert data_platform_tables[0].full_name == "mics.public.order_date"
- assert data_platform_tables[0].datasource_server == "localhost"
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name
- )
-
- assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST_VIEW"
assert (
- data_platform_tables[1].datasource_server
- == "ghh48144.snowflakefakecomputing.com"
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)"
)
assert (
- data_platform_tables[1].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
+ data_platform_tables[1].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst_view,PROD)"
)
@@ -521,36 +570,33 @@ def test_table_combine():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
- expression=M_QUERIES[16], # 1st index has the native query
+ expression=M_QUERIES[16],
name="virtual_order_table",
full_name="OrderDataSet.virtual_order_table",
)
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 2
- assert data_platform_tables[0].full_name == "GSL_TEST_DB.PUBLIC.SALES_FORECAST"
- assert (
- data_platform_tables[0].datasource_server
- == "ghh48144.snowflakefakecomputing.com"
- )
- assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
- )
- assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST"
assert (
- data_platform_tables[1].datasource_server
- == "ghh48144.snowflakefakecomputing.com"
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_forecast,PROD)"
)
+
assert (
- data_platform_tables[1].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
+ data_platform_tables[1].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst,PROD)"
)
@@ -574,8 +620,14 @@ def test_expression_is_none():
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 0
@@ -589,15 +641,20 @@ def test_redshift_regular_case():
)
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=False
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
+
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].full_name == table.full_name
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)"
)
@@ -609,13 +666,60 @@ def test_redshift_native_query():
)
reporter = PowerBiDashboardSourceReport()
+ ctx, config, platform_instance_resolver = get_default_instances()
+
+ config.native_query_parsing = True
+
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
- table, reporter, native_query_enabled=True
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
)
+
assert len(data_platform_tables) == 1
- assert data_platform_tables[0].name == table.full_name.split(".")[2]
- assert data_platform_tables[0].full_name == table.full_name
assert (
- data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
- == SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)"
+ )
+
+
+def test_sqlglot_parser():
+ table: powerbi_data_classes.Table = powerbi_data_classes.Table(
+ expression=M_QUERIES[24],
+ name="SALES_TARGET",
+ full_name="dev.public.sales",
+ )
+ reporter = PowerBiDashboardSourceReport()
+
+ ctx, config, platform_instance_resolver = get_default_instances(
+ override_config={
+ "server_to_platform_instance": {
+ "bu10758.ap-unknown-2.fakecomputing.com": {
+ "platform_instance": "sales_deployment",
+ "env": "PROD",
+ }
+ },
+ "native_query_parsing": True,
+ "enable_advance_lineage_sql_construct": True,
+ }
+ )
+
+ data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
+ table,
+ reporter,
+ ctx=ctx,
+ config=config,
+ platform_instance_resolver=platform_instance_resolver,
+ )
+
+ assert len(data_platform_tables) == 2
+ assert (
+ data_platform_tables[0].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit,PROD)"
+ )
+ assert (
+ data_platform_tables[1].urn
+ == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit_targets,PROD)"
)
diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
index d04c8d905b439..71428a7847953 100644
--- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
+++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
@@ -791,11 +791,9 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
database_override_map={"production database": "prod"}
)
- with mock.patch(
- "datahub.ingestion.source.tableau.sqlglot_lineage"
- ) as sqlglot_lineage:
+ with mock.patch("datahub.ingestion.source.tableau.sqlglot_l") as sqlglot_lineage:
- sqlglot_lineage.return_value = SqlParsingResult( # type:ignore
+ sqlglot_lineage.create_lineage_sql_parsed_result.return_value = SqlParsingResult( # type:ignore
in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)"
],
diff --git a/metadata-ingestion/tests/integration/vertica/docker-compose.yml b/metadata-ingestion/tests/integration/vertica/docker-compose.yml
index ddaf206f236cf..84af5c32a60e3 100644
--- a/metadata-ingestion/tests/integration/vertica/docker-compose.yml
+++ b/metadata-ingestion/tests/integration/vertica/docker-compose.yml
@@ -1,6 +1,7 @@
version: "3.9"
services:
vertica:
+ platform: linux/amd64
environment:
APP_DB_USER: "dbadmin"
APP_DB_PASSWORD: "abc123"
@@ -18,6 +19,3 @@ services:
volumes:
vertica-data:
-
-
-
diff --git a/metadata-ingestion/tests/integration/vertica/test_vertica.py b/metadata-ingestion/tests/integration/vertica/test_vertica.py
index db8bfd247313b..fe306d1d0b2b8 100644
--- a/metadata-ingestion/tests/integration/vertica/test_vertica.py
+++ b/metadata-ingestion/tests/integration/vertica/test_vertica.py
@@ -58,6 +58,7 @@ def vertica_runner(docker_compose_runner, test_resources_dir):
# Test needs more work to be done , currently it is working fine.
@freeze_time(FROZEN_TIME)
+@pytest.mark.skip("Failing in CI, cmd failing with exit code 1")
@pytest.mark.integration
def test_vertica_ingest_with_db(vertica_runner, pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/vertica"
diff --git a/metadata-ingestion/tests/unit/test_bigquery_profiler.py b/metadata-ingestion/tests/unit/test_bigquery_profiler.py
index a2aec8df93d09..44ce5f0a02e37 100644
--- a/metadata-ingestion/tests/unit/test_bigquery_profiler.py
+++ b/metadata-ingestion/tests/unit/test_bigquery_profiler.py
@@ -37,6 +37,7 @@ def test_generate_day_partitioned_partition_profiler_query():
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
+ cluster_column_position=None,
comment=None,
is_nullable=False,
)
@@ -79,6 +80,7 @@ def test_generate_day_partitioned_partition_profiler_query_with_set_partition_ti
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
+ cluster_column_position=None,
comment=None,
is_nullable=False,
)
@@ -120,6 +122,7 @@ def test_generate_hour_partitioned_partition_profiler_query():
ordinal_position=1,
data_type="TIMESTAMP",
is_partition_column=True,
+ cluster_column_position=None,
comment=None,
is_nullable=False,
)
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java
index 555acb2ffdd3b..efa4e0c279a76 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java
@@ -42,6 +42,9 @@ public static Map getPartialNgramConfigWithOverrides(Map getMappingsForField(@Nonnull final Searchable
mappingForField.put(NORMALIZER, KEYWORD_NORMALIZER);
// Add keyword subfield without lowercase filter
mappingForField.put(FIELDS, ImmutableMap.of(KEYWORD, KEYWORD_TYPE_MAP));
- } else if (fieldType == FieldType.TEXT || fieldType == FieldType.TEXT_PARTIAL) {
+ } else if (fieldType == FieldType.TEXT || fieldType == FieldType.TEXT_PARTIAL || fieldType == FieldType.WORD_GRAM) {
mappingForField.put(TYPE, KEYWORD);
mappingForField.put(NORMALIZER, KEYWORD_NORMALIZER);
Map subFields = new HashMap<>();
- if (fieldType == FieldType.TEXT_PARTIAL) {
+ if (fieldType == FieldType.TEXT_PARTIAL || fieldType == FieldType.WORD_GRAM) {
subFields.put(NGRAM, getPartialNgramConfigWithOverrides(
ImmutableMap.of(
ANALYZER, PARTIAL_ANALYZER
)
));
+ if (fieldType == FieldType.WORD_GRAM) {
+ for (Map.Entry entry : Map.of(
+ WORD_GRAMS_LENGTH_2, WORD_GRAM_2_ANALYZER,
+ WORD_GRAMS_LENGTH_3, WORD_GRAM_3_ANALYZER,
+ WORD_GRAMS_LENGTH_4, WORD_GRAM_4_ANALYZER).entrySet()) {
+ String fieldName = entry.getKey();
+ String analyzerName = entry.getValue();
+ subFields.put(fieldName, ImmutableMap.of(
+ TYPE, TEXT,
+ ANALYZER, analyzerName,
+ SEARCH_ANALYZER, analyzerName
+ ));
+ }
+ }
}
subFields.put(DELIMITED, ImmutableMap.of(
TYPE, TEXT,
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/SettingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/SettingsBuilder.java
index 5b3e396837aa7..e180c8296b48d 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/SettingsBuilder.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/SettingsBuilder.java
@@ -66,6 +66,9 @@ public class SettingsBuilder {
public static final String KEYWORD_ANALYZER = "keyword";
public static final String URN_ANALYZER = "urn_component";
public static final String URN_SEARCH_ANALYZER = "query_urn_component";
+ public static final String WORD_GRAM_2_ANALYZER = "word_gram_2";
+ public static final String WORD_GRAM_3_ANALYZER = "word_gram_3";
+ public static final String WORD_GRAM_4_ANALYZER = "word_gram_4";
// Filters
public static final String ALPHANUM_SPACE_ONLY = "alpha_num_space";
@@ -80,6 +83,10 @@ public class SettingsBuilder {
public static final String MULTIFILTER = "multifilter";
public static final String MULTIFILTER_GRAPH = "multifilter_graph";
public static final String PARTIAL_URN_COMPONENT = "partial_urn_component";
+ public static final String SHINGLE = "shingle";
+ public static final String WORD_GRAM_2_FILTER = "word_gram_2_filter";
+ public static final String WORD_GRAM_3_FILTER = "word_gram_3_filter";
+ public static final String WORD_GRAM_4_FILTER = "word_gram_4_filter";
public static final String SNOWBALL = "snowball";
public static final String STEM_OVERRIDE = "stem_override";
public static final String STOP = "stop";
@@ -108,6 +115,7 @@ public class SettingsBuilder {
public static final String SLASH_TOKENIZER = "slash_tokenizer";
public static final String UNIT_SEPARATOR_PATH_TOKENIZER = "unit_separator_path_tokenizer";
public static final String UNIT_SEPARATOR_TOKENIZER = "unit_separator_tokenizer";
+ public static final String WORD_GRAM_TOKENIZER = "word_gram_tokenizer";
// Do not remove the space, needed for multi-term synonyms
public static final List ALPHANUM_SPACE_PATTERNS = ImmutableList.of(
"([a-z0-9 _-]{2,})",
@@ -161,6 +169,13 @@ public class SettingsBuilder {
AUTOCOMPLETE_CUSTOM_DELIMITER,
LOWERCASE);
+ public static final List WORD_GRAM_TOKEN_FILTERS = ImmutableList.of(
+ ASCII_FOLDING,
+ LOWERCASE,
+ TRIM,
+ REMOVE_QUOTES
+ );
+
public final Map settings;
public SettingsBuilder(String mainTokenizer) {
@@ -275,6 +290,17 @@ private static Map buildFilters() throws IOException {
.collect(Collectors.toList()))
.build());
}
+
+ for (Map.Entry entry : Map.of(WORD_GRAM_2_FILTER, 2, WORD_GRAM_3_FILTER, 3, WORD_GRAM_4_FILTER, 4).entrySet()) {
+ String filterName = entry.getKey();
+ Integer gramSize = entry.getValue();
+ filters.put(filterName, ImmutableMap.builder()
+ .put(TYPE, SHINGLE)
+ .put("min_shingle_size", gramSize)
+ .put("max_shingle_size", gramSize)
+ .put("output_unigrams", false)
+ .build());
+ }
}
return filters.build();
@@ -302,13 +328,24 @@ private static Map buildTokenizers() {
.put(DELIMITER, "␟")
.build());
- // Tokenize by whitespace and most special chars
+ // Tokenize by most special chars
+ // Do NOT tokenize by whitespace to keep multi-word synonyms in the same token
+ // The split by whitespace is done later in the token filters phase
tokenizers.put(MAIN_TOKENIZER,
ImmutableMap.builder()
.put(TYPE, PATTERN)
.put(PATTERN, "[(),./:]")
.build());
+ // Tokenize by whitespace and most special chars for wordgrams
+ // only split on - when not preceded by a whitespace to preserve exclusion functionality
+ // i.e. "logging-events-bkcp" and "logging-events -bckp" should be handled differently
+ tokenizers.put(WORD_GRAM_TOKENIZER,
+ ImmutableMap.builder()
+ .put(TYPE, PATTERN)
+ .put(PATTERN, "[(),./:\\s_]|(?<=\\S)(-)")
+ .build());
+
return tokenizers.build();
}
@@ -382,6 +419,21 @@ private static Map buildAnalyzers(String mainTokenizer) {
.put(FILTER, SEARCH_TOKEN_FILTERS)
.build());
+ // Support word grams
+ for (Map.Entry entry : Map.of(
+ WORD_GRAM_2_ANALYZER, WORD_GRAM_2_FILTER,
+ WORD_GRAM_3_ANALYZER, WORD_GRAM_3_FILTER,
+ WORD_GRAM_4_ANALYZER, WORD_GRAM_4_FILTER).entrySet()) {
+ String analyzerName = entry.getKey();
+ String filterName = entry.getValue();
+ analyzers.put(analyzerName, ImmutableMap.builder()
+ .put(TOKENIZER, WORD_GRAM_TOKENIZER)
+ .put(FILTER, ImmutableList.