From 75b36c41ee4fd74891b1bfe37885b4cd840e2906 Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:32:24 -0700 Subject: [PATCH 01/11] docs(protobuf) Update messaging around nesting messages (#9048) --- metadata-integration/java/datahub-protobuf/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-integration/java/datahub-protobuf/README.md b/metadata-integration/java/datahub-protobuf/README.md index daea8d438679c..29b82aa3e68f5 100644 --- a/metadata-integration/java/datahub-protobuf/README.md +++ b/metadata-integration/java/datahub-protobuf/README.md @@ -1,6 +1,6 @@ # Protobuf Schemas -The `datahub-protobuf` module is designed to be used with the Java Emitter, the input is a compiled protobuf binary `*.protoc` files and optionally the corresponding `*.proto` source code. In addition, you can supply the root message in cases where a single protobuf source file includes multiple non-nested messages. +The `datahub-protobuf` module is designed to be used with the Java Emitter, the input is a compiled protobuf binary `*.protoc` files and optionally the corresponding `*.proto` source code. You can supply a file with multiple nested messages to be processed. If you have a file with multiple non-nested messages, you will need to separate them out into different files or supply the root message, as otherwise we will only process the first one. ## Supported Features From b1abd38a6b4aef3da0c50ecd23612cae7e3c5d28 Mon Sep 17 00:00:00 2001 From: Kos Korchak <97058061+kkorchak@users.noreply.github.com> Date: Thu, 19 Oct 2023 15:33:54 -0400 Subject: [PATCH 02/11] refactor(): Use data-testids for glossary_navigation and dataset_ownership tests (#9033) --- .../CreateGlossaryEntityModal.tsx | 7 ++- .../shared/EntityDropdown/EntityDropdown.tsx | 3 +- .../MoveGlossaryEntityModal.tsx | 5 +- .../Ownership/sidebar/SidebarOwnerSection.tsx | 6 +- .../src/app/glossary/BusinessGlossaryPage.tsx | 4 +- .../src/app/glossary/GlossarySidebar.tsx | 2 +- .../e2e/glossary/glossary_navigation.js | 55 +++++++++++-------- .../cypress/e2e/lineage/lineage_graph.js | 2 - .../e2e/mutations/dataset_ownership.js | 2 +- .../tests/cypress/cypress/support/commands.js | 1 + 10 files changed, 54 insertions(+), 33 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/EntityDropdown/CreateGlossaryEntityModal.tsx b/datahub-web-react/src/app/entity/shared/EntityDropdown/CreateGlossaryEntityModal.tsx index d48ead2f5863e..9788d36af2c65 100644 --- a/datahub-web-react/src/app/entity/shared/EntityDropdown/CreateGlossaryEntityModal.tsx +++ b/datahub-web-react/src/app/entity/shared/EntityDropdown/CreateGlossaryEntityModal.tsx @@ -112,7 +112,11 @@ function CreateGlossaryEntityModal(props: Props) { - @@ -130,6 +134,7 @@ function CreateGlossaryEntityModal(props: Props) { > Name}> setIsMoveModalVisible(true)} > - +  Move diff --git a/datahub-web-react/src/app/entity/shared/EntityDropdown/MoveGlossaryEntityModal.tsx b/datahub-web-react/src/app/entity/shared/EntityDropdown/MoveGlossaryEntityModal.tsx index 5352825708776..37a625f58100b 100644 --- a/datahub-web-react/src/app/entity/shared/EntityDropdown/MoveGlossaryEntityModal.tsx +++ b/datahub-web-react/src/app/entity/shared/EntityDropdown/MoveGlossaryEntityModal.tsx @@ -64,6 +64,7 @@ function MoveGlossaryEntityModal(props: Props) { return ( Cancel - + } > diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/sidebar/SidebarOwnerSection.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/sidebar/SidebarOwnerSection.tsx index 57743d0531afe..aa9a337d4ba44 100644 --- a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/sidebar/SidebarOwnerSection.tsx +++ b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/Ownership/sidebar/SidebarOwnerSection.tsx @@ -70,7 +70,11 @@ export const SidebarOwnerSection = ({ properties, readOnly }: Props) => { )} {!readOnly && ( - )} diff --git a/datahub-web-react/src/app/glossary/BusinessGlossaryPage.tsx b/datahub-web-react/src/app/glossary/BusinessGlossaryPage.tsx index 11f54cb5078e6..a5262265fd23d 100644 --- a/datahub-web-react/src/app/glossary/BusinessGlossaryPage.tsx +++ b/datahub-web-react/src/app/glossary/BusinessGlossaryPage.tsx @@ -92,11 +92,12 @@ function BusinessGlossaryPage() { {(termsError || nodesError) && ( )} - + Business Glossary
)} diff --git a/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx b/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx index dba9b25e14e99..7a14b6a794189 100644 --- a/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/CreateScheduleStep.tsx @@ -167,7 +167,11 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps
-
diff --git a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx index 913f8253ece5a..992ebff643c31 100644 --- a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx @@ -123,6 +123,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps)
0)} onClick={() => onClickCreate(false)} > diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeBuilder.tsx b/datahub-web-react/src/app/ingest/source/builder/RecipeBuilder.tsx index 4ddeb7b492595..bee9b04cee100 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeBuilder.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeBuilder.tsx @@ -86,10 +86,20 @@ function RecipeBuilder(props: Props) { {sourceConfigs?.displayName} Recipe - switchViews(true)}> + switchViews(true)} + data-testid="recipe-builder-form-button" + > Form - switchViews(false)}> + switchViews(false)} + data-testid="recipe-builder-yaml-button" + > YAML @@ -114,7 +124,9 @@ function RecipeBuilder(props: Props) { - + )} diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js new file mode 100644 index 0000000000000..6c5dd77810644 --- /dev/null +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js @@ -0,0 +1,68 @@ + +const number = Math.floor(Math.random() * 100000); +const accound_id = `account${number}`; +const warehouse_id = `warehouse${number}`; +const username = `user${number}`; +const password = `password${number}`; +const role = `role${number}`; +const ingestion_source_name = `ingestion source ${number}`; + +describe("ingestion source creation flow", () => { + it("create a ingestion source using ui, verify ingestion source details saved correctly, remove ingestion source", () => { + // Go to ingestion page, create a snowflake source + cy.loginWithCredentials(); + cy.goToIngestionPage(); + cy.clickOptionWithTestId("create-ingestion-source-button"); + cy.clickOptionWithText("Snowflake"); + cy.waitTextVisible("Snowflake Recipe"); + cy.get("#account_id").type(accound_id); + cy.get("#warehouse").type(warehouse_id); + cy.get("#username").type(username); + cy.get("#password").type(password); + cy.focused().blur(); + cy.get("#role").type(role); + + // Verify yaml recipe is generated correctly + cy.clickOptionWithTestId("recipe-builder-yaml-button"); + cy.waitTextVisible("account_id"); + cy.waitTextVisible(accound_id); + cy.waitTextVisible(warehouse_id); + cy.waitTextVisible(username); + cy.waitTextVisible(password); + cy.waitTextVisible(role); + + // Finish creating source + cy.clickOptionWithTestId("recipe-builder-next-button"); + cy.waitTextVisible("Configure an Ingestion Schedule"); + cy.clickOptionWithTestId("ingestion-schedule-next-button"); + cy.waitTextVisible("Give this ingestion source a name."); + cy.get('[data-testid="source-name-input"]').type(ingestion_source_name); + cy.clickOptionWithTestId("ingestion-source-save-button"); + cy.waitTextVisible("Successfully created ingestion source!").wait(5000) + cy.waitTextVisible(ingestion_source_name); + cy.get('[data-testid="ingestion-source-table-status"]').contains("Pending...").should("be.visible"); + + // Verify ingestion source details are saved correctly + cy.get('[data-testid="ingestion-source-table-edit-button"]').first().click(); + cy.waitTextVisible("Edit Ingestion Source"); + cy.get("#account_id").should("have.value", accound_id); + cy.get("#warehouse").should("have.value", warehouse_id); + cy.get("#username").should("have.value", username); + cy.get("#password").should("have.value", password); + cy.get("#role").should("have.value", role); + cy.get("button").contains("Next").click(); + cy.waitTextVisible("Configure an Ingestion Schedule"); + cy.clickOptionWithTestId("ingestion-schedule-next-button"); + cy.get('[data-testid="source-name-input"]').clear().type(ingestion_source_name + " EDITED"); + cy.clickOptionWithTestId("ingestion-source-save-button"); + cy.waitTextVisible("Successfully updated ingestion source!"); + cy.waitTextVisible(ingestion_source_name + " EDITED"); + + // Remove ingestion source + cy.get('[data-testid="delete-button"]').first().click(); + cy.waitTextVisible("Confirm Ingestion Source Removal"); + cy.get("button").contains("Yes").click(); + cy.waitTextVisible("Removed ingestion source."); + cy.ensureTextNotPresent(ingestion_source_name + " EDITED") + }) +}); \ No newline at end of file From 2fea466d48c856f5c469af6f611990a200e5bece Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 20 Oct 2023 13:47:52 -0700 Subject: [PATCH 05/11] docs: fix lineage capability annotations (#8954) --- .../src/datahub/ingestion/source/aws/glue.py | 1 + .../datahub/ingestion/source/bigquery_v2/bigquery.py | 1 + .../src/datahub/ingestion/source/kafka_connect.py | 1 + .../datahub/ingestion/source/looker/looker_source.py | 6 +++++- .../src/datahub/ingestion/source/metabase.py | 1 + .../src/datahub/ingestion/source/metadata/lineage.py | 10 +++++++++- .../src/datahub/ingestion/source/mode.py | 1 + .../src/datahub/ingestion/source/nifi.py | 4 +++- .../src/datahub/ingestion/source/powerbi/powerbi.py | 5 ++++- .../src/datahub/ingestion/source/sql_queries.py | 10 +++++++++- .../src/datahub/ingestion/source/superset.py | 1 + .../src/datahub/ingestion/source/tableau.py | 6 +++++- 12 files changed, 41 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index e5dff786b71d1..aa7e5aa352a3e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -221,6 +221,7 @@ def report_table_dropped(self, table: str) -> None: SourceCapability.DELETION_DETECTION, "Enabled by default when stateful ingestion is turned on.", ) +@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") class GlueSource(StatefulIngestionSourceBase): """ Note: if you also have files in S3 that you'd like to ingest, we recommend you use Glue's built-in data catalog. See [here](../../../../docs/generated/ingestion/sources/s3.md) for a quick guide on how to set up a crawler on Glue and ingest the outputs with DataHub. diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 552612f877b9a..692d8c4f81bb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -153,6 +153,7 @@ def cleanup(config: BigQueryV2Config) -> None: ) @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") +@capability(SourceCapability.LINEAGE_FINE, "Optionally enabled via configuration") @capability( SourceCapability.USAGE_STATS, "Enabled by default, can be disabled via configuration `include_usage_statistics`", diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 5fae0ee5215a3..1a1e012e80633 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -1096,6 +1096,7 @@ def transform_connector_config( @config_class(KafkaConnectSourceConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") class KafkaConnectSource(StatefulIngestionSourceBase): config: KafkaConnectSourceConfig report: KafkaConnectSourceReport diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 8297a0aa8efa7..a3df977582ca4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -103,6 +103,11 @@ @capability( SourceCapability.OWNERSHIP, "Enabled by default, configured using `extract_owners`" ) +@capability(SourceCapability.LINEAGE_COARSE, "Supported by default") +@capability( + SourceCapability.LINEAGE_FINE, + "Enabled by default, configured using `extract_column_level_lineage`", +) @capability( SourceCapability.USAGE_STATS, "Enabled by default, configured using `extract_usage_history`", @@ -1128,7 +1133,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def emit_independent_looks_mcp( self, dashboard_element: LookerDashboardElement ) -> Iterable[MetadataWorkUnit]: - yield from auto_workunit( stream=self._make_chart_metadata_events( dashboard_element=dashboard_element, diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index fb4512893feb1..24145d60210ff 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -80,6 +80,7 @@ def remove_trailing_slash(cls, v): @config_class(MetabaseConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability(SourceCapability.LINEAGE_COARSE, "Supported by default") class MetabaseSource(Source): """ This plugin extracts Charts, dashboards, and associated metadata. This plugin is in beta and has only been tested diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py index 1c0c809c16a60..f33c6e0edae3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py @@ -23,11 +23,17 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, + capability, config_class, platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport +from datahub.ingestion.api.source import ( + MetadataWorkUnitProcessor, + Source, + SourceCapability, + SourceReport, +) from datahub.ingestion.api.source_helpers import ( auto_status_aspect, auto_workunit_reporter, @@ -121,6 +127,8 @@ def version_must_be_1(cls, v): @platform_name("File Based Lineage") @config_class(LineageFileSourceConfig) @support_status(SupportStatus.CERTIFIED) +@capability(SourceCapability.LINEAGE_COARSE, "Specified in the lineage file.") +@capability(SourceCapability.LINEAGE_FINE, "Specified in the lineage file.") @dataclass class LineageFileSource(Source): """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index a000c66a406c2..c46b56da422d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -98,6 +98,7 @@ class HTTPError429(HTTPError): @config_class(ModeConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability(SourceCapability.LINEAGE_COARSE, "Supported by default") class ModeSource(Source): """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index ac1e03812db3b..bc05edbb3c623 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -26,11 +26,12 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, + capability, config_class, platform_name, support_status, ) -from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.source import Source, SourceCapability, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( DataFlowInfoClass, @@ -360,6 +361,7 @@ def report_dropped(self, ent_name: str) -> None: @platform_name("NiFi", id="nifi") @config_class(NifiSourceConfig) @support_status(SupportStatus.CERTIFIED) +@capability(SourceCapability.LINEAGE_COARSE, "Supported. See docs for limitations") class NifiSource(Source): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 52bcef66658c8..4611a8eed4782 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -264,7 +264,6 @@ def extract_lineage( ) if len(upstream) > 0: - upstream_lineage_class: UpstreamLineageClass = UpstreamLineageClass( upstreams=upstream, fineGrainedLineages=cll_lineage or None, @@ -1139,6 +1138,10 @@ def report_to_datahub_work_units( SourceCapability.OWNERSHIP, "Disabled by default, configured using `extract_ownership`", ) +@capability( + SourceCapability.LINEAGE_COARSE, + "Enabled by default, configured using `extract_lineage`.", +) @capability( SourceCapability.LINEAGE_FINE, "Disabled by default, configured using `extract_column_level_lineage`. ", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py b/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py index bce4d1ec76e6e..fcf97e461967c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql_queries.py @@ -20,11 +20,17 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, + capability, config_class, platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport +from datahub.ingestion.api.source import ( + MetadataWorkUnitProcessor, + Source, + SourceCapability, + SourceReport, +) from datahub.ingestion.api.source_helpers import auto_workunit_reporter from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph @@ -83,6 +89,8 @@ def compute_stats(self) -> None: @platform_name("SQL Queries") @config_class(SqlQueriesSourceConfig) @support_status(SupportStatus.TESTING) +@capability(SourceCapability.LINEAGE_COARSE, "Parsed from SQL queries") +@capability(SourceCapability.LINEAGE_FINE, "Parsed from SQL queries") class SqlQueriesSource(Source): # TODO: Documentation urns: Optional[Set[str]] diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 14bc4242d2a91..e491a1e8b82fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -142,6 +142,7 @@ def get_filter_name(filter_obj): @capability( SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" ) +@capability(SourceCapability.LINEAGE_COARSE, "Supported by default") class SupersetSource(StatefulIngestionSourceBase): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index bad7ae49d325e..4bc40b0aac964 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -452,6 +452,10 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): @capability(SourceCapability.OWNERSHIP, "Requires recipe configuration") @capability(SourceCapability.TAGS, "Requires recipe configuration") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability( + SourceCapability.LINEAGE_FINE, + "Enabled by default, configure using `extract_column_level_lineage`", +) class TableauSource(StatefulIngestionSourceBase): platform = "tableau" @@ -533,7 +537,7 @@ def fetch_projects(): path=[], ) # Set parent project name - for project_id, project in all_project_map.items(): + for _project_id, project in all_project_map.items(): if ( project.parent_id is not None and project.parent_id in all_project_map From 4d35a254cabb3a6241af8857c7d63298783ebaa7 Mon Sep 17 00:00:00 2001 From: Kos Korchak <97058061+kkorchak@users.noreply.github.com> Date: Fri, 20 Oct 2023 17:09:14 -0400 Subject: [PATCH 06/11] =?UTF-8?q?Added=20more=20data-testid=20usage=20for?= =?UTF-8?q?=20edit=5Fdocumentation=20and=20managing=5Fsecr=E2=80=A6=20(#90?= =?UTF-8?q?60)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../components/legacy/DescriptionModal.tsx | 6 ++- .../shared/components/styled/AddLinkModal.tsx | 6 ++- .../tabs/Documentation/DocumentationTab.tsx | 1 + .../components/DescriptionEditorToolbar.tsx | 2 +- .../app/ingest/secret/SecretBuilderModal.tsx | 4 ++ .../src/app/ingest/secret/SecretsList.tsx | 6 ++- .../e2e/glossary/glossary_navigation.js | 3 +- .../e2e/mutations/edit_documentation.js | 42 +++++++-------- .../cypress/e2e/mutations/managing_secrets.js | 51 ++++++++++--------- 9 files changed, 70 insertions(+), 51 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/components/legacy/DescriptionModal.tsx b/datahub-web-react/src/app/entity/shared/components/legacy/DescriptionModal.tsx index 579b8c9905da0..cb37c44a36caa 100644 --- a/datahub-web-react/src/app/entity/shared/components/legacy/DescriptionModal.tsx +++ b/datahub-web-react/src/app/entity/shared/components/legacy/DescriptionModal.tsx @@ -41,7 +41,11 @@ export default function UpdateDescriptionModal({ title, description, original, o footer={ <> - diff --git a/datahub-web-react/src/app/entity/shared/components/styled/AddLinkModal.tsx b/datahub-web-react/src/app/entity/shared/components/styled/AddLinkModal.tsx index 34d4f0cb3fe91..68a8cf4094362 100644 --- a/datahub-web-react/src/app/entity/shared/components/styled/AddLinkModal.tsx +++ b/datahub-web-react/src/app/entity/shared/components/styled/AddLinkModal.tsx @@ -57,7 +57,7 @@ export const AddLinkModal = ({ buttonProps, refetch }: AddLinkProps) => { return ( <> - { , - , ]} >
{ {
- diff --git a/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx b/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx index 539eef972608c..30f04d61b8fc9 100644 --- a/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx +++ b/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx @@ -40,6 +40,7 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel } Cancel
diff --git a/smoke-test/tests/cypress/cypress/e2e/glossary/glossary_navigation.js b/smoke-test/tests/cypress/cypress/e2e/glossary/glossary_navigation.js index f52e4d3984a88..aeceaf99be889 100644 --- a/smoke-test/tests/cypress/cypress/e2e/glossary/glossary_navigation.js +++ b/smoke-test/tests/cypress/cypress/e2e/glossary/glossary_navigation.js @@ -17,8 +17,7 @@ describe("glossary sidebar navigation test", () => { cy.waitTextVisible("Created Term Group!"); cy.waitTextVisible("Create Glossary Term"); cy.enterTextInTestId("create-glossary-entity-modal-name", glossaryTerm); - cy.clickOptionWithTestId("glossary-entity-modal-create-button"); - cy.waitTextVisible("Created Glossary Term!"); + cy.clickOptionWithTestId("glossary-entity-modal-create-button").wait(3000); cy.get('[data-testid="glossary-browser-sidebar"]').contains(glossaryTerm).click().wait(3000); cy.openThreeDotDropdown(); cy.clickOptionWithTestId("entity-menu-move-button") diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/edit_documentation.js b/smoke-test/tests/cypress/cypress/e2e/mutations/edit_documentation.js index 83b66e2cb2549..5f9758a35ca0e 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/edit_documentation.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/edit_documentation.js @@ -10,20 +10,20 @@ describe("edit documentation and link to dataset", () => { cy.visit( "/dataset/urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)/Schema" ); - cy.get("[role='tab']").contains("Documentation").click(); + cy.openEntityTab("Documentation"); cy.waitTextVisible("my hive dataset"); cy.waitTextVisible("Sample doc"); - cy.clickOptionWithText("Edit"); + cy.clickOptionWithTestId("edit-documentation-button"); cy.focused().clear(); cy.focused().type(documentation_edited); - cy.get("button").contains("Save").click(); + cy.clickOptionWithTestId("description-editor-save-button"); cy.waitTextVisible("Description Updated"); cy.waitTextVisible(documentation_edited); //return documentation to original state - cy.clickOptionWithText("Edit"); + cy.clickOptionWithTestId("edit-documentation-button"); cy.focused().clear().wait(1000); cy.focused().type("my hive dataset"); - cy.get("button").contains("Save").click(); + cy.clickOptionWithTestId("description-editor-save-button"); cy.waitTextVisible("Description Updated"); cy.waitTextVisible("my hive dataset"); }); @@ -33,21 +33,21 @@ describe("edit documentation and link to dataset", () => { cy.visit( "/dataset/urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)/Schema" ); - cy.get("[role='tab']").contains("Documentation").click(); + cy.openEntityTab("Documentation"); cy.contains("Sample doc").trigger("mouseover", { force: true }); cy.get('[data-icon="delete"]').click(); cy.waitTextVisible("Link Removed"); - cy.get("button").contains("Add Link").click().wait(1000); - cy.get('[role="dialog"] #addLinkForm_url').type(wrong_url); + cy.clickOptionWithTestId("add-link-button").wait(1000); + cy.enterTextInTestId("add-link-modal-url", wrong_url); cy.waitTextVisible("This field must be a valid url."); cy.focused().clear(); cy.waitTextVisible("A URL is required."); - cy.focused().type(correct_url); + cy.enterTextInTestId("add-link-modal-url", correct_url); cy.ensureTextNotPresent("This field must be a valid url."); - cy.get("#addLinkForm_label").type("Sample doc"); - cy.get('[role="dialog"] button').contains("Add").click(); + cy.enterTextInTestId("add-link-modal-label", "Sample doc"); + cy.clickOptionWithTestId("add-link-modal-add-button"); cy.waitTextVisible("Link Added"); - cy.get("[role='tab']").contains("Documentation").click(); + cy.openEntityTab("Documentation"); cy.get(`[href='${correct_url}']`).should("be.visible"); }); @@ -55,18 +55,18 @@ describe("edit documentation and link to dataset", () => { cy.loginWithCredentials(); cy.visit("/domain/urn:li:domain:marketing/Entities"); cy.waitTextVisible("SampleCypressKafkaDataset"); - cy.get("button").contains("Add Link").click().wait(1000); - cy.get('[role="dialog"] #addLinkForm_url').type(wrong_url); + cy.clickOptionWithTestId("add-link-button").wait(1000); + cy.enterTextInTestId("add-link-modal-url", wrong_url); cy.waitTextVisible("This field must be a valid url."); cy.focused().clear(); cy.waitTextVisible("A URL is required."); - cy.focused().type(correct_url); + cy.enterTextInTestId("add-link-modal-url", correct_url); cy.ensureTextNotPresent("This field must be a valid url."); - cy.get("#addLinkForm_label").type("Sample doc"); - cy.get('[role="dialog"] button').contains("Add").click(); + cy.enterTextInTestId("add-link-modal-label", "Sample doc"); + cy.clickOptionWithTestId("add-link-modal-add-button"); cy.waitTextVisible("Link Added"); - cy.get("[role='tab']").contains("Documentation").click(); - cy.waitTextVisible("Edit"); + cy.openEntityTab("Documentation"); + cy.get("[data-testid='edit-documentation-button']").should("be.visible"); cy.get(`[href='${correct_url}']`).should("be.visible"); cy.contains("Sample doc").trigger("mouseover", { force: true }); cy.get('[data-icon="delete"]').click(); @@ -83,14 +83,14 @@ describe("edit documentation and link to dataset", () => { cy.waitTextVisible("Foo field description has changed"); cy.focused().clear().wait(1000); cy.focused().type(documentation_edited); - cy.get("button").contains("Update").click(); + cy.clickOptionWithTestId("description-modal-update-button"); cy.waitTextVisible("Updated!"); cy.waitTextVisible(documentation_edited); cy.waitTextVisible("(edited)"); cy.get("tbody [data-icon='edit']").first().click({ force: true }); cy.focused().clear().wait(1000); cy.focused().type("Foo field description has changed"); - cy.get("button").contains("Update").click(); + cy.clickOptionWithTestId("description-modal-update-button"); cy.waitTextVisible("Updated!"); cy.waitTextVisible("Foo field description has changed"); cy.waitTextVisible("(edited)"); diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js index 466bb2ef0757e..77fd63b9cae02 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js @@ -8,23 +8,24 @@ const ingestion_source_name = `ingestion source ${number}`; describe("managing secrets for ingestion creation", () => { it("create a secret, create ingestion source using a secret, remove a secret", () => { + // Navigate to the manage ingestion page → secrets cy.loginWithCredentials(); - //navigate to the manage ingestion page → secrets cy.goToIngestionPage(); - cy.clickOptionWithText("Secrets"); - //create a new secret - cy.clickOptionWithText("Create new secret"); - cy.get('[role="dialog"]').contains("Create a new Secret").should("be.visible"); - cy.get('[role="dialog"] #name').type(`secretname${number}`); - cy.get('[role="dialog"] #value').type(`secretvalue${number}`); - cy.get('[role="dialog"] #description').type(`secretdescription${number}`); - cy.get('#createSecretButton').click(); + cy.openEntityTab("Secrets"); + + // Create a new secret + cy.clickOptionWithTestId("create-secret-button"); + cy.enterTextInTestId('secret-modal-name-input', `secretname${number}`); + cy.enterTextInTestId('secret-modal-value-input', `secretvalue${number}`); + cy.enterTextInTestId('secret-modal-description-input', `secretdescription${number}`); + cy.clickOptionWithTestId("secret-modal-create-button"); cy.waitTextVisible("Successfully created Secret!"); cy.waitTextVisible(`secretname${number}`); - cy.waitTextVisible(`secretdescription${number}`).wait(5000)//prevent issue with missing secret - //create an ingestion source using a secret + cy.waitTextVisible(`secretdescription${number}`).wait(5000) + + // Create an ingestion source using a secret cy.goToIngestionPage(); - cy.clickOptionWithText("Create new source"); + cy.get("#ingestion-create-source").click(); cy.clickOptionWithText("Snowflake"); cy.waitTextVisible("Snowflake Recipe"); cy.get("#account_id").type(accound_id); @@ -40,11 +41,12 @@ describe("managing secrets for ingestion creation", () => { cy.waitTextVisible("Give this ingestion source a name."); cy.get('[data-testid="source-name-input"]').type(ingestion_source_name); cy.get("button").contains("Save").click(); - cy.waitTextVisible("Successfully created ingestion source!").wait(5000)//prevent issue with missing form data + cy.waitTextVisible("Successfully created ingestion source!").wait(5000) cy.waitTextVisible(ingestion_source_name); cy.get("button").contains("Pending...").should("be.visible"); - //remove a secret - cy.clickOptionWithText("Secrets"); + + // Remove a secret + cy.openEntityTab("Secrets"); cy.waitTextVisible(`secretname${number}`); cy.get('[data-icon="delete"]').first().click(); cy.waitTextVisible("Confirm Secret Removal"); @@ -52,14 +54,16 @@ describe("managing secrets for ingestion creation", () => { cy.waitTextVisible("Removed secret."); cy.ensureTextNotPresent(`secretname${number}`); cy.ensureTextNotPresent(`secretdescription${number}`); - //remove ingestion source + + // Remove ingestion source cy.goToIngestionPage(); cy.get('[data-testid="delete-button"]').first().click(); cy.waitTextVisible("Confirm Ingestion Source Removal"); cy.get("button").contains("Yes").click(); cy.waitTextVisible("Removed ingestion source."); cy.ensureTextNotPresent(ingestion_source_name) - //verify secret is not present during ingestion source creation for password dropdown + + // Verify secret is not present during ingestion source creation for password dropdown cy.clickOptionWithText("Create new source"); cy.clickOptionWithText("Snowflake"); cy.waitTextVisible("Snowflake Recipe"); @@ -68,13 +72,13 @@ describe("managing secrets for ingestion creation", () => { cy.get("#username").type(username); cy.get("#password").click().wait(1000); cy.ensureTextNotPresent(`secretname${number}`); - //verify secret can be added during ingestion source creation and used successfully + + // Verify secret can be added during ingestion source creation and used successfully cy.clickOptionWithText("Create Secret"); - cy.get('[role="dialog"]').contains("Create a new Secret").should("be.visible"); - cy.get('[role="dialog"] #name').type(`secretname${number}`); - cy.get('[role="dialog"] #value').type(`secretvalue${number}`); - cy.get('[role="dialog"] #description').type(`secretdescription${number}`); - cy.get('#createSecretButton').click(); + cy.enterTextInTestId('secret-modal-name-input', `secretname${number}`) + cy.enterTextInTestId('secret-modal-value-input', `secretvalue${number}`) + cy.enterTextInTestId('secret-modal-description-input', `secretdescription${number}`) + cy.clickOptionWithTestId("secret-modal-create-button"); cy.waitTextVisible("Created secret!"); cy.get("#role").type(role); cy.get("button").contains("Next").click(); @@ -86,6 +90,7 @@ describe("managing secrets for ingestion creation", () => { cy.waitTextVisible("Successfully created ingestion source!").wait(5000)//prevent issue with missing form data cy.waitTextVisible(ingestion_source_name); cy.get("button").contains("Pending...").should("be.visible"); + //Remove ingestion source and secret cy.goToIngestionPage(); cy.get('[data-testid="delete-button"]').first().click(); From 63599c95553b89304b656efb2c208c9084d60717 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Sat, 21 Oct 2023 03:17:28 -0700 Subject: [PATCH 07/11] fix(search): fix mapping builder bug (#9062) --- .../search/elasticsearch/indexbuilder/MappingsBuilder.java | 2 +- .../timeseries/search/TimeseriesAspectServiceTestBase.java | 6 ++++-- .../io/datahubproject/test/search/SearchTestContainer.java | 2 +- smoke-test/tests/containers/containers_test.py | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java index 1edc77bbd214c..35cef71edd953 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java @@ -133,7 +133,7 @@ private static Map getMappingsForField(@Nonnull final Searchable } else if (fieldType == FieldType.DATETIME) { mappingForField.put(TYPE, ESUtils.DATE_FIELD_TYPE); } else if (fieldType == FieldType.OBJECT) { - mappingForField.put(TYPE, ESUtils.DATE_FIELD_TYPE); + mappingForField.put(TYPE, ESUtils.OBJECT_FIELD_TYPE); } else { log.info("FieldType {} has no mappings implemented", fieldType); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index f9b8f84b10ad2..b19d2026fbfc4 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -889,15 +889,17 @@ public void testCountByFilterAfterDelete() throws InterruptedException { @Test(groups = {"getAggregatedStats"}, dependsOnGroups = {"upsert"}) public void testGetIndexSizes() { List result = _elasticSearchTimeseriesAspectService.getIndexSizes(); + //CHECKSTYLE:OFF /* Example result: {aspectName=testentityprofile, sizeMb=52.234, indexName=es_timeseries_aspect_service_test_testentity_testentityprofileaspect_v1, entityName=testentity} {aspectName=testentityprofile, sizeMb=0.208, indexName=es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1, entityName=testentitywithouttests} */ // There may be other indices in there from other tests, so just make sure that index for entity + aspect is in there - assertTrue(result.size() > 1); + //CHECKSTYLE:ON + assertTrue(result.size() > 0); assertTrue( result.stream().anyMatch(idxSizeResult -> idxSizeResult.getIndexName().equals( - "es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1"))); + "es_timeseries_aspect_service_test_testentity_testentityprofileaspect_v1"))); } } diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java index 67e1ee368f513..4c1555fc510e6 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java +++ b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java @@ -5,7 +5,7 @@ import java.time.Duration; public interface SearchTestContainer { - String SEARCH_JAVA_OPTS = "-Xms64m -Xmx384m -XX:MaxDirectMemorySize=368435456"; + String SEARCH_JAVA_OPTS = "-Xms446m -Xmx446m -XX:MaxDirectMemorySize=368435456"; Duration STARTUP_TIMEOUT = Duration.ofMinutes(5); // usually < 1min GenericContainer startContainer(); diff --git a/smoke-test/tests/containers/containers_test.py b/smoke-test/tests/containers/containers_test.py index 05a45239dabf8..227645a87d30a 100644 --- a/smoke-test/tests/containers/containers_test.py +++ b/smoke-test/tests/containers/containers_test.py @@ -227,6 +227,7 @@ def test_update_container(frontend_session, ingest_cleanup_data): "ownerUrn": new_owner, "resourceUrn": container_urn, "ownerEntityType": "CORP_USER", + "ownershipTypeUrn": "urn:li:ownershipType:__system__technical_owner" } }, } From 86e0023a4e158467130f7337478a48bf98fb344b Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Sat, 21 Oct 2023 16:20:59 +0100 Subject: [PATCH 08/11] feat(ingestion): Adds more advanced configurations for runtime debugging (#8998) --- .../ingest/IngestionResolverUtils.java | 10 ++ ...eateIngestionExecutionRequestResolver.java | 3 + .../source/UpsertIngestionSourceResolver.java | 10 ++ .../src/main/resources/ingestion.graphql | 10 ++ .../UpsertIngestionSourceResolverTest.java | 2 +- .../app/ingest/source/IngestionSourceList.tsx | 8 +- .../ingest/source/builder/NameSourceStep.tsx | 123 +++++++++++++++++- .../src/app/ingest/source/builder/types.ts | 17 +++ .../src/graphql/ingestion.graphql | 8 ++ docker/build.gradle | 12 +- docs/ui-ingestion.md | 20 ++- .../docs/dev_guides/profiling_ingestions.md | 39 ++++++ .../TimeseriesAspectServiceTestBase.java | 6 +- .../test/search/SearchTestContainer.java | 2 + .../ingestion/DataHubIngestionSourceInfo.pdl | 13 +- 15 files changed, 267 insertions(+), 16 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java index 7db0b6f826a04..1140c031f1d35 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java @@ -5,6 +5,7 @@ import com.linkedin.datahub.graphql.generated.IngestionConfig; import com.linkedin.datahub.graphql.generated.IngestionSchedule; import com.linkedin.datahub.graphql.generated.IngestionSource; +import com.linkedin.datahub.graphql.generated.StringMapEntry; import com.linkedin.datahub.graphql.generated.StructuredReport; import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper; import com.linkedin.entity.EntityResponse; @@ -21,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -143,6 +145,14 @@ public static IngestionConfig mapIngestionSourceConfig(final DataHubIngestionSou result.setVersion(config.getVersion()); result.setExecutorId(config.getExecutorId()); result.setDebugMode(config.isDebugMode()); + if (config.getExtraArgs() != null) { + List extraArgs = config.getExtraArgs() + .keySet() + .stream() + .map(key -> new StringMapEntry(key, config.getExtraArgs().get(key))) + .collect(Collectors.toList()); + result.setExtraArgs(extraArgs); + } return result; } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java index e5064e6620526..ea20b837e0a1f 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java @@ -117,6 +117,9 @@ public CompletableFuture get(final DataFetchingEnvironment environment) if (ingestionSourceInfo.getConfig().hasDebugMode()) { debugMode = ingestionSourceInfo.getConfig().isDebugMode() ? "true" : "false"; } + if (ingestionSourceInfo.getConfig().hasExtraArgs()) { + arguments.putAll(ingestionSourceInfo.getConfig().getExtraArgs()); + } arguments.put(DEBUG_MODE_ARG_NAME, debugMode); execInput.setArgs(new StringMap(arguments)); diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java index 2ce394ad5ba84..68e334bd976f8 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java @@ -1,10 +1,12 @@ package com.linkedin.datahub.graphql.resolvers.ingest.source; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.StringMap; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.AuthorizationException; import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode; import com.linkedin.datahub.graphql.exception.DataHubGraphQLException; +import com.linkedin.datahub.graphql.generated.StringMapEntryInput; import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput; import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput; import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput; @@ -17,6 +19,8 @@ import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; +import java.util.Map; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import java.net.URISyntaxException; @@ -108,6 +112,12 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig if (input.getDebugMode() != null) { result.setDebugMode(input.getDebugMode()); } + if (input.getExtraArgs() != null) { + Map extraArgs = input.getExtraArgs() + .stream() + .collect(Collectors.toMap(StringMapEntryInput::getKey, StringMapEntryInput::getValue)); + result.setExtraArgs(new StringMap(extraArgs)); + } return result; } diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index 69c8aff124583..21f9fb2633119 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -332,6 +332,11 @@ type IngestionConfig { Advanced: Whether or not to run ingestion in debug mode """ debugMode: Boolean + + """ + Advanced: Extra arguments for the ingestion run. + """ + extraArgs: [StringMapEntry!] } """ @@ -483,6 +488,11 @@ input UpdateIngestionSourceConfigInput { Whether or not to run ingestion in debug mode """ debugMode: Boolean + + """ + Extra arguments for the ingestion run. + """ + extraArgs: [StringMapEntryInput!] } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolverTest.java index 2538accc694fb..16d8da9169a8f 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolverTest.java @@ -26,7 +26,7 @@ public class UpsertIngestionSourceResolverTest { "Test source", "mysql", "Test source description", new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"), - new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false) + new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false, null) ); @Test diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx index 0e341a5ff3a79..13af19b0b6ac2 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx @@ -15,7 +15,7 @@ import { Message } from '../../shared/Message'; import TabToolbar from '../../entity/shared/components/styled/TabToolbar'; import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal'; import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils'; -import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types'; +import { DEFAULT_EXECUTOR_ID, SourceBuilderState, StringMapEntryInput } from './builder/types'; import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated'; import { SearchBar } from '../../search/SearchBar'; import { useEntityRegistry } from '../../useEntityRegistry'; @@ -173,6 +173,11 @@ export const IngestionSourceList = () => { setFocusSourceUrn(undefined); }; + const formatExtraArgs = (extraArgs): StringMapEntryInput[] => { + if (extraArgs === null || extraArgs === undefined) return []; + return extraArgs.map((entry) => ({ key: entry.key, value: entry.value })); + }; + const createOrUpdateIngestionSource = ( input: UpdateIngestionSourceInput, resetState: () => void, @@ -294,6 +299,7 @@ export const IngestionSourceList = () => { (recipeBuilderState.config?.executorId as string)) || DEFAULT_EXECUTOR_ID, debugMode: recipeBuilderState.config?.debugMode || false, + extraArgs: formatExtraArgs(recipeBuilderState.config?.extraArgs || []), }, schedule: recipeBuilderState.schedule && { interval: recipeBuilderState.schedule?.interval as string, diff --git a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx index 992ebff643c31..f4c048bcaf0d2 100644 --- a/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx @@ -1,7 +1,7 @@ import { Button, Checkbox, Collapse, Form, Input, Typography } from 'antd'; import React from 'react'; import styled from 'styled-components'; -import { SourceBuilderState, StepProps } from './types'; +import { SourceBuilderState, StepProps, StringMapEntryInput } from './types'; const ControlsContainer = styled.div` display: flex; @@ -13,6 +13,10 @@ const SaveButton = styled(Button)` margin-right: 15px; `; +const ExtraEnvKey = 'extra_env_vars'; +const ExtraReqKey = 'extra_pip_requirements'; +const ExtraPluginKey = 'extra_pip_plugins'; + export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) => { const setName = (stagedName: string) => { const newState: SourceBuilderState = { @@ -55,6 +59,90 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) updateState(newState); }; + const retrieveExtraEnvs = () => { + const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const index: number = extraArgs.findIndex((entry) => entry.key === ExtraEnvKey) as number; + if (index > -1) { + return extraArgs[index].value; + } + return ''; + }; + + const setExtraEnvs = (envs: string) => { + let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const indxOfEnvVars: number = extraArgs.findIndex((entry) => entry.key === ExtraEnvKey) as number; + const value = { key: ExtraEnvKey, value: envs }; + if (indxOfEnvVars > -1) { + extraArgs[indxOfEnvVars] = value; + } else { + extraArgs = [...extraArgs, value]; + } + const newState: SourceBuilderState = { + ...state, + config: { + ...state.config, + extraArgs, + }, + }; + updateState(newState); + }; + + const retrieveExtraDataHubPlugins = () => { + const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const index: number = extraArgs.findIndex((entry) => entry.key === ExtraPluginKey) as number; + if (index > -1) { + return extraArgs[index].value; + } + return ''; + }; + + const setExtraDataHubPlugins = (plugins: string) => { + let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const indxOfPlugins: number = extraArgs.findIndex((entry) => entry.key === ExtraPluginKey) as number; + const value = { key: ExtraPluginKey, value: plugins }; + if (indxOfPlugins > -1) { + extraArgs[indxOfPlugins] = value; + } else { + extraArgs = [...extraArgs, value]; + } + const newState: SourceBuilderState = { + ...state, + config: { + ...state.config, + extraArgs, + }, + }; + updateState(newState); + }; + + const retrieveExtraReqs = () => { + const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const index: number = extraArgs.findIndex((entry) => entry.key === ExtraReqKey) as number; + if (index > -1) { + return extraArgs[index].value; + } + return ''; + }; + + const setExtraReqs = (reqs: string) => { + let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : []; + const indxOfReqs: number = extraArgs.findIndex((entry) => entry.key === ExtraReqKey) as number; + const value = { key: ExtraReqKey, value: reqs }; + if (indxOfReqs > -1) { + extraArgs[indxOfReqs] = value; + } else { + extraArgs = [...extraArgs, value]; + } + const newState: SourceBuilderState = { + ...state, + config: { + ...state.config, + extraArgs, + }, + }; + updateState(newState); + }; + const onClickCreate = (shouldRun?: boolean) => { if (state.name !== undefined && state.name.length > 0) { submit(shouldRun); @@ -116,6 +204,39 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) onChange={(event) => setDebugMode(event.target.checked)} />
+ Extra Enviroment Variables}> + + Advanced: Set extra environment variables to an ingestion execution + + setExtraEnvs(event.target.value)} + /> + + Extra DataHub plugins}> + + Advanced: Set extra DataHub plugins for an ingestion execution + + setExtraDataHubPlugins(event.target.value)} + /> + + Extra Pip Libraries}> + + Advanced: Add extra pip libraries for an ingestion execution + + setExtraReqs(event.target.value)} + /> +
diff --git a/datahub-web-react/src/app/ingest/source/builder/types.ts b/datahub-web-react/src/app/ingest/source/builder/types.ts index cfe0f27ae7dbe..2df467b7beba1 100644 --- a/datahub-web-react/src/app/ingest/source/builder/types.ts +++ b/datahub-web-react/src/app/ingest/source/builder/types.ts @@ -34,6 +34,18 @@ export type StepProps = { ingestionSources: SourceConfig[]; }; +export type StringMapEntryInput = { + /** + * The key of the map entry + */ + key: string; + + /** + * The value fo the map entry + */ + value: string; +}; + /** * The object represents the state of the Ingestion Source Builder form. */ @@ -91,5 +103,10 @@ export interface SourceBuilderState { * Advanced: Whether or not to run this ingestion source in debug mode */ debugMode?: boolean | null; + + /** + * Advanced: Extra arguments for the ingestion run. + */ + extraArgs?: StringMapEntryInput[] | null; }; } diff --git a/datahub-web-react/src/graphql/ingestion.graphql b/datahub-web-react/src/graphql/ingestion.graphql index c127e9ec03f9a..1767fe34bfef0 100644 --- a/datahub-web-react/src/graphql/ingestion.graphql +++ b/datahub-web-react/src/graphql/ingestion.graphql @@ -12,6 +12,10 @@ query listIngestionSources($input: ListIngestionSourcesInput!) { version executorId debugMode + extraArgs { + key + value + } } schedule { interval @@ -51,6 +55,10 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) { version executorId debugMode + extraArgs { + key + value + } } schedule { interval diff --git a/docker/build.gradle b/docker/build.gradle index c8fdbc86b18b7..56634a5fe0c67 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -97,10 +97,20 @@ task quickstartDebug(type: Exec, dependsOn: ':metadata-ingestion:install') { dependsOn(debug_modules.collect { it + ':dockerTagDebug' }) shouldRunAfter ':metadata-ingestion:clean', 'quickstartNuke' - environment "DATAHUB_PRECREATE_TOPICS", "true" environment "DATAHUB_TELEMETRY_ENABLED", "false" environment "DOCKER_COMPOSE_BASE", "file://${rootProject.projectDir}" + // Elastic + // environment "DATAHUB_SEARCH_IMAGE", 'elasticsearch' + // environment "DATAHUB_SEARCH_TAG", '7.10.1' + + // OpenSearch + environment "DATAHUB_SEARCH_IMAGE", 'opensearchproject/opensearch' + environment "DATAHUB_SEARCH_TAG", '2.9.0' + environment "XPACK_SECURITY_ENABLED", 'plugins.security.disabled=true' + environment "USE_AWS_ELASTICSEARCH", 'true' + + def cmd = [ 'source ../metadata-ingestion/venv/bin/activate && ', 'datahub docker quickstart', diff --git a/docs/ui-ingestion.md b/docs/ui-ingestion.md index db2007e1e19a9..438ddd8823b7e 100644 --- a/docs/ui-ingestion.md +++ b/docs/ui-ingestion.md @@ -1,5 +1,12 @@ +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + # Ingestion + + ## Introduction Starting in version `0.8.25`, DataHub supports creating, configuring, scheduling, & executing batch metadata ingestion using the DataHub user interface. This makes @@ -173,28 +180,29 @@ Finally, give your Ingestion Source a name. Once you're happy with your configurations, click 'Done' to save your changes. -##### Advanced: Running with a specific CLI version +##### Advanced ingestion configs: -DataHub comes pre-configured to use the latest version of the DataHub CLI ([acryl-datahub](https://pypi.org/project/acryl-datahub/)) that is compatible +DataHub's Managed Ingestion UI comes pre-configured to use the latest version of the DataHub CLI ([acryl-datahub](https://pypi.org/project/acryl-datahub/)) that is compatible with the server. However, you can override the default package version using the 'Advanced' source configurations. To do so, simply click 'Advanced', then change the 'CLI Version' text box to contain the exact version of the DataHub CLI you'd like to use. -

_Pinning the CLI version to version `0.8.23.2`_ +Other advanced options include specifying **environment variables**, **DataHub plugins** or **python packages at runtime**. + Once you're happy with your changes, simply click 'Done' to save. You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy). -An example execution would look something like: +An example execution for a given `recipe.yaml` file, would look something like: ```bash datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml @@ -330,8 +338,8 @@ for the `datahub-actions` container and running `docker logs `. There are valid cases for ingesting metadata without the UI-based ingestion scheduler. For example, - You have written a custom ingestion Source -- Your data sources are not reachable on the network where DataHub is deployed -- Your ingestion source requires context from a local filesystem (e.g. input files, environment variables, etc) +- Your data sources are not reachable on the network where DataHub is deployed. Managed DataHub users can use a [remote executor](managed-datahub/operator-guide/setting-up-remote-ingestion-executor-on-aws.md) for remote UI-based ingestion. +- Your ingestion source requires context from a local filesystem (e.g. input files) - You want to distribute metadata ingestion among multiple producers / environments ### How do I attach policies to the actions pod to give it permissions to pull metadata from various sources? diff --git a/metadata-ingestion/docs/dev_guides/profiling_ingestions.md b/metadata-ingestion/docs/dev_guides/profiling_ingestions.md index d876d99b494f8..77cc2f456aa2d 100644 --- a/metadata-ingestion/docs/dev_guides/profiling_ingestions.md +++ b/metadata-ingestion/docs/dev_guides/profiling_ingestions.md @@ -13,6 +13,35 @@ This page documents how to perform memory profiles of ingestion runs. It is useful when trying to size the amount of resources necessary to ingest some source or when developing new features or sources. ## How to use + + + + +Create an ingestion as specified in the [Ingestion guide](../../../docs/ui-ingestion.md). + +Add a flag to your ingestion recipe to generate a memray memory dump of your ingestion: +```yaml +source: + ... + +sink: + ... + +flags: + generate_memory_profiles: "" +``` + +In the final panel, under the advanced section, add the `debug` datahub package under the **Extra DataHub Plugins** section. +As seen below: + +

+ +

+ +Finally, save and run the ingestion process. + +
+ Install the `debug` plugin for DataHub's CLI wherever the ingestion runs: ```bash @@ -33,6 +62,16 @@ flags: generate_memory_profiles: "" ``` +Finally run the ingestion recipe + +```bash +$ datahub ingest -c recipe.yaml +``` + + +
+ + Once the ingestion run starts a binary file will be created and appended to during the execution of the ingestion. These files follow the pattern `file-.bin` for a unique identification. diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index b19d2026fbfc4..1362a0f69eff2 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -892,8 +892,10 @@ public void testGetIndexSizes() { //CHECKSTYLE:OFF /* Example result: - {aspectName=testentityprofile, sizeMb=52.234, indexName=es_timeseries_aspect_service_test_testentity_testentityprofileaspect_v1, entityName=testentity} - {aspectName=testentityprofile, sizeMb=0.208, indexName=es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1, entityName=testentitywithouttests} + {aspectName=testentityprofile, sizeMb=52.234, + indexName=es_timeseries_aspect_service_test_testentity_testentityprofileaspect_v1, entityName=testentity} + {aspectName=testentityprofile, sizeMb=0.208, + indexName=es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1, entityName=testentitywithouttests} */ // There may be other indices in there from other tests, so just make sure that index for entity + aspect is in there //CHECKSTYLE:ON diff --git a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java index 4c1555fc510e6..34aa6978f742f 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java +++ b/metadata-io/src/test/java/io/datahubproject/test/search/SearchTestContainer.java @@ -5,7 +5,9 @@ import java.time.Duration; public interface SearchTestContainer { + String SEARCH_JAVA_OPTS = "-Xms446m -Xmx446m -XX:MaxDirectMemorySize=368435456"; + Duration STARTUP_TIMEOUT = Duration.ofMinutes(5); // usually < 1min GenericContainer startContainer(); diff --git a/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl index b3e237202fc2f..f777b5d6e12e7 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/ingestion/DataHubIngestionSourceInfo.pdl @@ -37,10 +37,10 @@ record DataHubIngestionSourceInfo { * Parameters associated with the Ingestion Source */ config: record DataHubIngestionSourceConfig { - /** - * The JSON recipe to use for ingestion - */ - recipe: string + /** + * The JSON recipe to use for ingestion + */ + recipe: string /** * The PyPI version of the datahub CLI to use when executing a recipe @@ -56,5 +56,10 @@ record DataHubIngestionSourceInfo { * Whether or not to run this ingestion source in debug mode */ debugMode: optional boolean + + /** + * Extra arguments for the ingestion run. + */ + extraArgs: optional map[string, string] } } \ No newline at end of file From 04216e30bb2a1eab1993e48276d9f8e52d6b3121 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Sat, 21 Oct 2023 17:21:54 +0200 Subject: [PATCH 09/11] feat(ingest/s3): S3 add partition to schema (#8900) Co-authored-by: Pedro Silva --- .../src/datahub/ingestion/source/s3/config.py | 5 ++- .../src/datahub/ingestion/source/s3/source.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 9b5296f0b9dd5..3ef6476078f6f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -75,7 +75,10 @@ class DataLakeSourceConfig( default=100, description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) - + add_partition_columns_to_schema: bool = Field( + default=False, + description="Whether to add partition fields to the schema.", + ) verify_ssl: Union[bool, str] = Field( default=True, description="Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index eb49fcbb268c0..94c571eabad11 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -78,6 +78,7 @@ NullTypeClass, NumberTypeClass, RecordTypeClass, + SchemaField, SchemaFieldDataType, SchemaMetadata, StringTypeClass, @@ -90,6 +91,7 @@ OperationClass, OperationTypeClass, OtherSchemaClass, + SchemaFieldDataTypeClass, _Aspect, ) from datahub.telemetry import stats, telemetry @@ -458,8 +460,39 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: logger.debug(f"Extracted fields in schema: {fields}") fields = sorted(fields, key=lambda f: f.fieldPath) + if self.source_config.add_partition_columns_to_schema: + self.add_partition_columns_to_schema( + fields=fields, path_spec=path_spec, full_path=table_data.full_path + ) + return fields + def add_partition_columns_to_schema( + self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] + ) -> None: + is_fieldpath_v2 = False + for field in fields: + if field.fieldPath.startswith("[version=2.0]"): + is_fieldpath_v2 = True + break + vars = path_spec.get_named_vars(full_path) + if vars is not None and "partition_key" in vars: + for partition_key in vars["partition_key"].values(): + fields.append( + SchemaField( + fieldPath=f"{partition_key}" + if not is_fieldpath_v2 + else f"[version=2.0].[type=string].{partition_key}", + nativeDataType="string", + type=SchemaFieldDataType(StringTypeClass()) + if not is_fieldpath_v2 + else SchemaFieldDataTypeClass(type=StringTypeClass()), + isPartitioningKey=True, + nullable=True, + recursive=False, + ) + ) + def get_table_profile( self, table_data: TableData, dataset_urn: str ) -> Iterable[MetadataWorkUnit]: From 633e6d6f779dc1e09bdfd10a160889684c485706 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 23 Oct 2023 17:15:44 +0100 Subject: [PATCH 10/11] feat(frontend): Remove debug flag from start script (#9075) --- docker/datahub-frontend/start.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/docker/datahub-frontend/start.sh b/docker/datahub-frontend/start.sh index 430982aa2456b..9dc1514144bb1 100755 --- a/docker/datahub-frontend/start.sh +++ b/docker/datahub-frontend/start.sh @@ -50,7 +50,6 @@ export JAVA_OPTS="-Xms512m \ -Djava.security.auth.login.config=datahub-frontend/conf/jaas.conf \ -Dlogback.configurationFile=datahub-frontend/conf/logback.xml \ -Dlogback.debug=false \ - -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 \ ${PROMETHEUS_AGENT:-} ${OTEL_AGENT:-} \ ${TRUSTSTORE_FILE:-} ${TRUSTSTORE_TYPE:-} ${TRUSTSTORE_PASSWORD:-} \ ${HTTP_PROXY:-} ${HTTPS_PROXY:-} ${NO_PROXY:-} \ From 8fb95e88a17260d0d6727f4d5e09636b128faf47 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 23 Oct 2023 12:40:42 -0700 Subject: [PATCH 11/11] feat(sqlparser): parse create DDL statements (#9002) --- .../goldens/v2_sqlite_operator.json | 162 +++++++++++++++--- .../v2_sqlite_operator_no_dag_listener.json | 162 +++++++++++++++--- .../datahub/emitter/sql_parsing_builder.py | 9 +- .../testing/check_sql_parser_result.py | 9 + .../src/datahub/utilities/sqlglot_lineage.py | 92 ++++++++-- .../test_bigquery_create_view_with_cte.json | 8 +- ..._bigquery_from_sharded_table_wildcard.json | 4 +- .../test_bigquery_nested_subqueries.json | 4 +- ..._bigquery_sharded_table_normalization.json | 4 +- .../test_bigquery_star_with_replace.json | 6 +- .../test_bigquery_view_from_union.json | 4 +- .../goldens/test_create_table_ddl.json | 55 +++++- .../goldens/test_create_view_as_select.json | 2 +- .../test_select_from_struct_subfields.json | 2 +- .../test_select_with_full_col_name.json | 2 +- .../test_teradata_default_normalization.json | 2 + 16 files changed, 430 insertions(+), 97 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 1a32b38ce055d..81d0a71b651d9 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -74,9 +74,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -98,7 +96,44 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -157,7 +192,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:24.632190+00:00", + "start_date": "2023-10-15 20:29:10.262813+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -172,7 +207,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056984632, + "time": 1697401750262, "actor": "urn:li:corpuser:datahub" } } @@ -221,7 +256,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056984632, + "timestampMillis": 1697401750262, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -251,9 +286,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -275,7 +308,80 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -331,7 +437,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056984947, + "timestampMillis": 1697401750651, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -447,7 +553,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:28.605901+00:00", + "start_date": "2023-10-15 20:29:15.013834+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -462,7 +568,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056988605, + "time": 1697401755013, "actor": "urn:li:corpuser:datahub" } } @@ -511,7 +617,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056988605, + "timestampMillis": 1697401755013, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -621,7 +727,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056989098, + "timestampMillis": 1697401755600, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -807,7 +913,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:32.888165+00:00", + "start_date": "2023-10-15 20:29:20.216818+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -822,7 +928,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056992888, + "time": 1697401760216, "actor": "urn:li:corpuser:datahub" } } @@ -895,7 +1001,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056992888, + "timestampMillis": 1697401760216, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1131,7 +1237,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056993744, + "timestampMillis": 1697401761237, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1249,7 +1355,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:37.745717+00:00", + "start_date": "2023-10-15 20:29:26.243934+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1264,7 +1370,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056997745, + "time": 1697401766243, "actor": "urn:li:corpuser:datahub" } } @@ -1313,7 +1419,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056997745, + "timestampMillis": 1697401766243, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1425,7 +1531,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056998672, + "timestampMillis": 1697401767373, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1543,7 +1649,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:42.645806+00:00", + "start_date": "2023-10-15 20:29:32.075613+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1558,7 +1664,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057002645, + "time": 1697401772075, "actor": "urn:li:corpuser:datahub" } } @@ -1607,7 +1713,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057002645, + "timestampMillis": 1697401772075, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1719,7 +1825,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057003759, + "timestampMillis": 1697401773454, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index c082be693e30c..96a0f02ccec17 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -74,9 +74,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -98,7 +96,44 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -157,7 +192,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:45.832554+00:00", + "start_date": "2023-10-15 20:27:26.883178+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -172,7 +207,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057245832, + "time": 1697401646883, "actor": "urn:li:corpuser:datahub" } } @@ -221,7 +256,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057245832, + "timestampMillis": 1697401646883, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -251,9 +286,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -275,7 +308,80 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -331,7 +437,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057246734, + "timestampMillis": 1697401647826, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -502,7 +608,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:49.653938+00:00", + "start_date": "2023-10-15 20:27:31.398799+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -517,7 +623,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057249653, + "time": 1697401651398, "actor": "urn:li:corpuser:datahub" } } @@ -566,7 +672,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057249653, + "timestampMillis": 1697401651398, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -676,7 +782,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057250831, + "timestampMillis": 1697401652651, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -917,7 +1023,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:53.989264+00:00", + "start_date": "2023-10-15 20:27:37.697995+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -932,7 +1038,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057253989, + "time": 1697401657697, "actor": "urn:li:corpuser:datahub" } } @@ -1005,7 +1111,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057253989, + "timestampMillis": 1697401657697, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1241,7 +1347,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057255628, + "timestampMillis": 1697401659496, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1414,7 +1520,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:01:00.421177+00:00", + "start_date": "2023-10-15 20:27:45.670215+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1429,7 +1535,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057260421, + "time": 1697401665670, "actor": "urn:li:corpuser:datahub" } } @@ -1478,7 +1584,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057260421, + "timestampMillis": 1697401665670, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1590,7 +1696,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057262258, + "timestampMillis": 1697401667670, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1763,7 +1869,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:01:05.540192+00:00", + "start_date": "2023-10-15 20:27:51.559194+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1778,7 +1884,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057265540, + "time": 1697401671559, "actor": "urn:li:corpuser:datahub" } } @@ -1827,7 +1933,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057265540, + "timestampMillis": 1697401671559, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1939,7 +2045,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057267631, + "timestampMillis": 1697401673788, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index 071d590f270f8..dedcfa0385f75 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -179,15 +179,16 @@ def add_lineage( def gen_workunits(self) -> Iterable[MetadataWorkUnit]: if self.generate_lineage: - yield from self._gen_lineage_workunits() + for mcp in self._gen_lineage_mcps(): + yield mcp.as_workunit() if self.generate_usage_statistics: yield from self._gen_usage_statistics_workunits() - def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: + def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: for downstream_urn in self._lineage_map: upstreams: List[UpstreamClass] = [] fine_upstreams: List[FineGrainedLineageClass] = [] - for upstream_urn, edge in self._lineage_map[downstream_urn].items(): + for edge in self._lineage_map[downstream_urn].values(): upstreams.append(edge.gen_upstream_aspect()) fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: ) yield MetadataChangeProposalWrapper( entityUrn=downstream_urn, aspect=upstream_lineage - ).as_workunit() + ) def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]: yield from self._usage_aggregator.generate_workunits( diff --git a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py index b3b1331db768b..2b610947e9043 100644 --- a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py +++ b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver( *, expected_file: pathlib.Path, schema_resolver: SchemaResolver, + allow_table_error: bool = False, **kwargs: Any, ) -> None: # HACK: Our BigQuery source overwrites this value and doesn't undo it. @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver( **kwargs, ) + if res.debug_info.table_error: + if allow_table_error: + logger.info( + f"SQL parser table error: {res.debug_info.table_error}", + exc_info=res.debug_info.table_error, + ) + else: + raise res.debug_info.table_error if res.debug_info.column_error: logger.warning( f"SQL parser column error: {res.debug_info.column_error}", diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index c830ec8c02fd4..97121b368f507 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -241,9 +241,9 @@ class SqlParsingResult(_ParserBaseModel): ) -def _parse_statement(sql: str, dialect: str) -> sqlglot.Expression: - statement = sqlglot.parse_one( - sql, read=dialect, error_level=sqlglot.ErrorLevel.RAISE +def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression: + statement: sqlglot.Expression = sqlglot.maybe_parse( + sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE ) return statement @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901 default_db: Optional[str], default_schema: Optional[str], ) -> List[_ColumnLineageInfo]: - if not isinstance( - statement, - _SupportedColumnLineageTypesTuple, + is_create_ddl = _is_create_table_ddl(statement) + if ( + not isinstance( + statement, + _SupportedColumnLineageTypesTuple, + ) + and not is_create_ddl ): raise UnsupportedStatementTypeError( f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}" ) + column_lineage: List[_ColumnLineageInfo] = [] + use_case_insensitive_cols = dialect in { # Column identifiers are case-insensitive in BigQuery, so we need to # do a normalization step beforehand to make sure it's resolved correctly. @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve( ) from e logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect)) + # Handle the create DDL case. + if is_create_ddl: + assert ( + output_table is not None + ), "output_table must be set for create DDL statements" + + create_schema: sqlglot.exp.Schema = statement.this + sqlglot_columns = create_schema.expressions + + for column_def in sqlglot_columns: + if not isinstance(column_def, sqlglot.exp.ColumnDef): + # Ignore things like constraints. + continue + + output_col = _schema_aware_fuzzy_column_resolve( + output_table, column_def.name + ) + output_col_type = column_def.args.get("kind") + + column_lineage.append( + _ColumnLineageInfo( + downstream=_DownstreamColumnRef( + table=output_table, + column=output_col, + column_type=output_col_type, + ), + upstreams=[], + ) + ) + + return column_lineage + # Try to figure out the types of the output columns. try: statement = sqlglot.optimizer.annotate_types.annotate_types( @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve( # This is not a fatal error, so we can continue. logger.debug("sqlglot failed to annotate types: %s", e) - column_lineage = [] - try: assert isinstance(statement, _SupportedColumnLineageTypesTuple) @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve( (select_col.alias_or_name, select_col) for select_col in statement.selects ] logger.debug("output columns: %s", [col[0] for col in output_columns]) - output_col: str for output_col, original_col_expression in output_columns: if output_col == "*": # If schema information is available, the * will be expanded to the actual columns. @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve( # Generate SELECT lineage. # Using a set here to deduplicate upstreams. - direct_col_upstreams: Set[_ColumnRef] = set() + direct_raw_col_upstreams: Set[_ColumnRef] = set() for node in lineage_node.walk(): if node.downstream: # We only want the leaf nodes. @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve( if node.subfield: normalized_col = f"{normalized_col}.{node.subfield}" - col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col) - direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col)) + direct_raw_col_upstreams.add( + _ColumnRef(table=table_ref, column=normalized_col) + ) else: # This branch doesn't matter. For example, a count(*) column would go here, and # we don't get any column-level lineage for that. @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve( if original_col_expression.type: output_col_type = original_col_expression.type - if not direct_col_upstreams: + # Fuzzy resolve upstream columns. + direct_resolved_col_upstreams = { + _ColumnRef( + table=edge.table, + column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column), + ) + for edge in direct_raw_col_upstreams + } + + if not direct_resolved_col_upstreams: logger.debug(f' "{output_col}" has no upstreams') column_lineage.append( _ColumnLineageInfo( @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve( column=output_col, column_type=output_col_type, ), - upstreams=sorted(direct_col_upstreams), + upstreams=sorted(direct_resolved_col_upstreams), # logic=column_logic.sql(pretty=True, dialect=dialect), ) ) - # TODO: Also extract referenced columns (e.g. non-SELECT lineage) + # TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage) except (sqlglot.errors.OptimizeError, ValueError) as e: raise SqlUnderstandingError( f"sqlglot failed to compute some lineage: {e}" @@ -700,6 +745,12 @@ def _extract_select_from_create( return statement +def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: + return isinstance(statement, sqlglot.exp.Create) and isinstance( + statement.this, sqlglot.exp.Schema + ) + + def _try_extract_select( statement: sqlglot.exp.Expression, ) -> sqlglot.exp.Expression: @@ -766,6 +817,7 @@ def _translate_sqlglot_type( def _translate_internal_column_lineage( table_name_urn_mapping: Dict[_TableName, str], raw_column_lineage: _ColumnLineageInfo, + dialect: str, ) -> ColumnLineageInfo: downstream_urn = None if raw_column_lineage.downstream.table: @@ -779,7 +831,9 @@ def _translate_internal_column_lineage( ) if raw_column_lineage.downstream.column_type else None, - native_column_type=raw_column_lineage.downstream.column_type.sql() + native_column_type=raw_column_lineage.downstream.column_type.sql( + dialect=dialect + ) if raw_column_lineage.downstream.column_type and raw_column_lineage.downstream.column_type.this != sqlglot.exp.DataType.Type.UNKNOWN @@ -800,12 +854,14 @@ def _get_dialect(platform: str) -> str: # TODO: convert datahub platform names to sqlglot dialect if platform == "presto-on-hive": return "hive" + if platform == "mssql": + return "tsql" else: return platform def _sqlglot_lineage_inner( - sql: str, + sql: sqlglot.exp.ExpOrStr, schema_resolver: SchemaResolver, default_db: Optional[str] = None, default_schema: Optional[str] = None, @@ -918,7 +974,7 @@ def _sqlglot_lineage_inner( if column_lineage: column_lineage_urns = [ _translate_internal_column_lineage( - table_name_urn_mapping, internal_col_lineage + table_name_urn_mapping, internal_col_lineage, dialect=dialect ) for internal_col_lineage in column_lineage ] diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json index f0175b4dc8892..d610b0a83f229 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json @@ -18,7 +18,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -36,7 +36,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -54,7 +54,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -72,7 +72,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json index b7df5444987f2..2d3d188d28316 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json index 67e306bebf545..41ae0885941b0 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json index b7df5444987f2..2d3d188d28316 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json index b393b2445d6c4..26f8f8f59a3ff 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json @@ -16,7 +16,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -34,7 +34,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -52,7 +52,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json index 53fb94300e804..83365c09f69c2 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json @@ -17,7 +17,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -39,7 +39,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json index 4773974545bfa..cf31b71cb50f6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json @@ -4,5 +4,58 @@ "out_tables": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)" ], - "column_lineage": null + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "id", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INTEGER" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "month", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "total_cost", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "area", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + } + ] } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json index ff452467aa5bd..8a6b60d0f1bde 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json @@ -30,7 +30,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "BIGINT" + "native_column_type": "NUMBER" }, "upstreams": [] }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json index 5ad847e252497..2424fcda34752 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "DECIMAL" + "native_column_type": "NUMERIC" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json index 6ee3d2e61c39b..8dd2633eff612 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "DECIMAL" + "native_column_type": "NUMERIC" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json index b0351a7e07ad2..ee80285d87f60 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json @@ -12,6 +12,7 @@ "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", "column": "PatientId", + "column_type": null, "native_column_type": "INTEGER()" }, "upstreams": [ @@ -25,6 +26,7 @@ "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", "column": "BMI", + "column_type": null, "native_column_type": "FLOAT()" }, "upstreams": [