)
diff --git a/datahub-web-react/src/app/lineage/LineageLoadingSection.tsx b/datahub-web-react/src/app/lineage/LineageLoadingSection.tsx
new file mode 100644
index 0000000000000..9d84de0c21172
--- /dev/null
+++ b/datahub-web-react/src/app/lineage/LineageLoadingSection.tsx
@@ -0,0 +1,27 @@
+import * as React from 'react';
+import styled from 'styled-components';
+import { LoadingOutlined } from '@ant-design/icons';
+import { ANTD_GRAY } from '../entity/shared/constants';
+
+const Container = styled.div`
+ height: auto;
+ width: 100%;
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ background-color: rgb(250, 250, 250);
+`;
+
+const StyledLoading = styled(LoadingOutlined)`
+ font-size: 36px;
+ color: ${ANTD_GRAY[7]};
+ padding-bottom: 18px;
+]`;
+
+export default function LineageLoadingSection() {
+ return (
+
+
+
+ );
+}
diff --git a/datahub-web-react/src/app/search/CommandK.tsx b/datahub-web-react/src/app/search/CommandK.tsx
new file mode 100644
index 0000000000000..13e55a0e3f266
--- /dev/null
+++ b/datahub-web-react/src/app/search/CommandK.tsx
@@ -0,0 +1,29 @@
+import React from 'react';
+import styled from 'styled-components';
+import { ANTD_GRAY } from '../entity/shared/constants';
+
+const Container = styled.div`
+ color: ${ANTD_GRAY[6]};
+ background-color: #ffffff;
+ opacity: 0.9;
+ border-color: black;
+ border-radius: 6px;
+ border: 1px solid ${ANTD_GRAY[6]};
+ padding-right: 6px;
+ padding-left: 6px;
+ margin-right: 4px;
+ margin-left: 4px;
+`;
+
+const Letter = styled.span`
+ padding: 2px;
+`;
+
+export const CommandK = () => {
+ return (
+
+ ⌘
+ K
+
+ );
+};
diff --git a/datahub-web-react/src/app/search/SearchBar.tsx b/datahub-web-react/src/app/search/SearchBar.tsx
index 5f797e68fe0e8..a23ead83caf54 100644
--- a/datahub-web-react/src/app/search/SearchBar.tsx
+++ b/datahub-web-react/src/app/search/SearchBar.tsx
@@ -23,6 +23,7 @@ import { navigateToSearchUrl } from './utils/navigateToSearchUrl';
import ViewAllSearchItem from './ViewAllSearchItem';
import { ViewSelect } from '../entity/view/select/ViewSelect';
import { combineSiblingsInAutoComplete } from './utils/combineSiblingsInAutoComplete';
+import { CommandK } from './CommandK';
const StyledAutoComplete = styled(AutoComplete)`
width: 100%;
@@ -114,6 +115,7 @@ interface Props {
fixAutoComplete?: boolean;
hideRecommendations?: boolean;
showQuickFilters?: boolean;
+ showCommandK?: boolean;
viewsEnabled?: boolean;
combineSiblings?: boolean;
setIsSearchBarFocused?: (isSearchBarFocused: boolean) => void;
@@ -142,6 +144,7 @@ export const SearchBar = ({
fixAutoComplete,
hideRecommendations,
showQuickFilters,
+ showCommandK = false,
viewsEnabled = false,
combineSiblings = false,
setIsSearchBarFocused,
@@ -153,6 +156,8 @@ export const SearchBar = ({
const [searchQuery, setSearchQuery] = useState(initialQuery);
const [selected, setSelected] = useState();
const [isDropdownVisible, setIsDropdownVisible] = useState(false);
+ const [isFocused, setIsFocused] = useState(false);
+
useEffect(() => setSelected(initialQuery), [initialQuery]);
const searchEntityTypes = entityRegistry.getSearchEntityTypes();
@@ -277,11 +282,13 @@ export const SearchBar = ({
function handleFocus() {
if (onFocus) onFocus();
handleSearchBarClick(true);
+ setIsFocused(true);
}
function handleBlur() {
if (onBlur) onBlur();
handleSearchBarClick(false);
+ setIsFocused(false);
}
function handleSearch(query: string, type?: EntityType, appliedQuickFilters?: FacetFilterInput[]) {
@@ -294,18 +301,21 @@ export const SearchBar = ({
const searchInputRef = useRef(null);
useEffect(() => {
- const handleKeyDown = (event) => {
- // Support command-k to select the search bar.
- // 75 is the keyCode for 'k'
- if ((event.metaKey || event.ctrlKey) && event.keyCode === 75) {
- (searchInputRef?.current as any)?.focus();
- }
- };
- document.addEventListener('keydown', handleKeyDown);
- return () => {
- document.removeEventListener('keydown', handleKeyDown);
- };
- }, []);
+ if (showCommandK) {
+ const handleKeyDown = (event) => {
+ // Support command-k to select the search bar.
+ // 75 is the keyCode for 'k'
+ if ((event.metaKey || event.ctrlKey) && event.keyCode === 75) {
+ (searchInputRef?.current as any)?.focus();
+ }
+ };
+ document.addEventListener('keydown', handleKeyDown);
+ return () => {
+ document.removeEventListener('keydown', handleKeyDown);
+ };
+ }
+ return () => null;
+ }, [showCommandK]);
return (
@@ -377,7 +387,7 @@ export const SearchBar = ({
data-testid="search-input"
onFocus={handleFocus}
onBlur={handleBlur}
- allowClear={{ clearIcon: }}
+ allowClear={(isFocused && { clearIcon: }) || false}
prefix={
<>
{viewsEnabled && (
@@ -411,6 +421,7 @@ export const SearchBar = ({
>
}
ref={searchInputRef}
+ suffix={(showCommandK && !isFocused && ) || null}
/>
diff --git a/datahub-web-react/src/app/search/SearchHeader.tsx b/datahub-web-react/src/app/search/SearchHeader.tsx
index 91f9753a3d601..76e78a11d3e9d 100644
--- a/datahub-web-react/src/app/search/SearchHeader.tsx
+++ b/datahub-web-react/src/app/search/SearchHeader.tsx
@@ -108,6 +108,7 @@ export const SearchHeader = ({
fixAutoComplete
showQuickFilters
showViewAllResults
+ showCommandK
/>
diff --git a/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx b/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx
index 3f46f35889fd1..4a7a4938ea970 100644
--- a/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx
+++ b/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx
@@ -105,20 +105,20 @@ export function HeaderLinks(props: Props) {
View and modify your data dictionary
-
+
}
>
diff --git a/datahub-web-react/yarn.lock b/datahub-web-react/yarn.lock
index 590f3ebcef8c3..ce0f2f514dad1 100644
--- a/datahub-web-react/yarn.lock
+++ b/datahub-web-react/yarn.lock
@@ -2298,6 +2298,14 @@
"@graphql-tools/utils" "^6"
tslib "~2.0.1"
+"@graphql-codegen/fragment-matcher@^5.0.0":
+ version "5.0.0"
+ resolved "https://registry.yarnpkg.com/@graphql-codegen/fragment-matcher/-/fragment-matcher-5.0.0.tgz#2a016715e42e8f21aa08830f34a4d0a930e660fe"
+ integrity sha512-mbash9E8eY6RSMSNrrO+C9JJEn8rdr8ORaxMpgdWL2qe2q/TlLUCE3ZvQvHkSc7GjBnMEk36LncA8ApwHR2BHg==
+ dependencies:
+ "@graphql-codegen/plugin-helpers" "^5.0.0"
+ tslib "~2.5.0"
+
"@graphql-codegen/near-operation-file-preset@^1.17.13":
version "1.18.6"
resolved "https://registry.yarnpkg.com/@graphql-codegen/near-operation-file-preset/-/near-operation-file-preset-1.18.6.tgz#2378ac75feaeaa1cfd2146bd84bf839b1fe20d9d"
@@ -2331,6 +2339,18 @@
lodash "~4.17.0"
tslib "~2.3.0"
+"@graphql-codegen/plugin-helpers@^5.0.0":
+ version "5.0.1"
+ resolved "https://registry.yarnpkg.com/@graphql-codegen/plugin-helpers/-/plugin-helpers-5.0.1.tgz#e2429fcfba3f078d5aa18aa062d46c922bbb0d55"
+ integrity sha512-6L5sb9D8wptZhnhLLBcheSPU7Tg//DGWgc5tQBWX46KYTOTQHGqDpv50FxAJJOyFVJrveN9otWk9UT9/yfY4ww==
+ dependencies:
+ "@graphql-tools/utils" "^10.0.0"
+ change-case-all "1.0.15"
+ common-tags "1.8.2"
+ import-from "4.0.0"
+ lodash "~4.17.0"
+ tslib "~2.5.0"
+
"@graphql-codegen/typescript-operations@1.17.13":
version "1.17.13"
resolved "https://registry.yarnpkg.com/@graphql-codegen/typescript-operations/-/typescript-operations-1.17.13.tgz#a5b08c1573b9507ca5a9e66e795aecc40ddc5305"
@@ -2584,6 +2604,16 @@
dependencies:
tslib "^2.4.0"
+"@graphql-tools/utils@^10.0.0":
+ version "10.0.8"
+ resolved "https://registry.yarnpkg.com/@graphql-tools/utils/-/utils-10.0.8.tgz#c7b84275ec83dc42ad9f3d4ffc424ff682075759"
+ integrity sha512-yjyA8ycSa1WRlJqyX/aLqXeE5DvF/H02+zXMUFnCzIDrj0UvLMUrxhmVFnMK0Q2n3bh4uuTeY3621m5za9ovXw==
+ dependencies:
+ "@graphql-typed-document-node/core" "^3.1.1"
+ cross-inspect "1.0.0"
+ dset "^3.1.2"
+ tslib "^2.4.0"
+
"@graphql-tools/utils@^6":
version "6.2.4"
resolved "https://registry.yarnpkg.com/@graphql-tools/utils/-/utils-6.2.4.tgz#38a2314d2e5e229ad4f78cca44e1199e18d55856"
@@ -2618,6 +2648,11 @@
resolved "https://registry.yarnpkg.com/@graphql-typed-document-node/core/-/core-3.1.0.tgz#0eee6373e11418bfe0b5638f654df7a4ca6a3950"
integrity sha512-wYn6r8zVZyQJ6rQaALBEln5B1pzxb9shV5Ef97kTvn6yVGrqyXVnDqnU24MXnFubR+rZjBY9NWuxX3FB2sTsjg==
+"@graphql-typed-document-node/core@^3.1.1":
+ version "3.2.0"
+ resolved "https://registry.yarnpkg.com/@graphql-typed-document-node/core/-/core-3.2.0.tgz#5f3d96ec6b2354ad6d8a28bf216a1d97b5426861"
+ integrity sha512-mB9oAsNCm9aM3/SOv4YtBMqZbYj10R7dkq8byBqxGY/ncFwhf2oQzMV+LCRlWoDSEBJ3COiR1yeDvMtsoOsuFQ==
+
"@hapi/address@2.x.x":
version "2.1.4"
resolved "https://registry.yarnpkg.com/@hapi/address/-/address-2.1.4.tgz#5d67ed43f3fd41a69d4b9ff7b56e7c0d1d0a81e5"
@@ -7001,6 +7036,22 @@ change-case-all@1.0.14:
upper-case "^2.0.2"
upper-case-first "^2.0.2"
+change-case-all@1.0.15:
+ version "1.0.15"
+ resolved "https://registry.yarnpkg.com/change-case-all/-/change-case-all-1.0.15.tgz#de29393167fc101d646cd76b0ef23e27d09756ad"
+ integrity sha512-3+GIFhk3sNuvFAJKU46o26OdzudQlPNBCu1ZQi3cMeMHhty1bhDxu2WrEilVNYaGvqUtR1VSigFcJOiS13dRhQ==
+ dependencies:
+ change-case "^4.1.2"
+ is-lower-case "^2.0.2"
+ is-upper-case "^2.0.2"
+ lower-case "^2.0.2"
+ lower-case-first "^2.0.2"
+ sponge-case "^1.0.1"
+ swap-case "^2.0.2"
+ title-case "^3.0.3"
+ upper-case "^2.0.2"
+ upper-case-first "^2.0.2"
+
change-case@^4.1.2:
version "4.1.2"
resolved "https://registry.yarnpkg.com/change-case/-/change-case-4.1.2.tgz#fedfc5f136045e2398c0410ee441f95704641e12"
@@ -7357,6 +7408,11 @@ common-tags@1.8.0, common-tags@^1.8.0:
resolved "https://registry.yarnpkg.com/common-tags/-/common-tags-1.8.0.tgz#8e3153e542d4a39e9b10554434afaaf98956a937"
integrity sha512-6P6g0uetGpW/sdyUy/iQQCbFF0kWVMSIVSyYz7Zgjcgh8mgw8PQzDNZeyZ5DQ2gM7LBoZPHmnjz8rUthkBG5tw==
+common-tags@1.8.2:
+ version "1.8.2"
+ resolved "https://registry.yarnpkg.com/common-tags/-/common-tags-1.8.2.tgz#94ebb3c076d26032745fd54face7f688ef5ac9c6"
+ integrity sha512-gk/Z852D2Wtb//0I+kRFNKKE9dIIVirjoqPoA1wJU+XePVXZfGeBpk45+A1rKO4Q43prqWBNY/MiIeRLbPWUaA==
+
commondir@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/commondir/-/commondir-1.0.1.tgz#ddd800da0c66127393cca5950ea968a3aaf1253b"
@@ -7698,6 +7754,13 @@ cross-fetch@^3.1.5:
dependencies:
node-fetch "2.6.7"
+cross-inspect@1.0.0:
+ version "1.0.0"
+ resolved "https://registry.yarnpkg.com/cross-inspect/-/cross-inspect-1.0.0.tgz#5fda1af759a148594d2d58394a9e21364f6849af"
+ integrity sha512-4PFfn4b5ZN6FMNGSZlyb7wUhuN8wvj8t/VQHZdM4JsDcruGJ8L2kf9zao98QIrBPFCpdk27qst/AGTl7pL3ypQ==
+ dependencies:
+ tslib "^2.4.0"
+
cross-spawn@7.0.3, cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3:
version "7.0.3"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.3.tgz#f73a85b9d5d41d045551c177e2882d4ac85728a6"
@@ -8595,6 +8658,11 @@ dotenv@^8.2.0:
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-8.6.0.tgz#061af664d19f7f4d8fc6e4ff9b584ce237adcb8b"
integrity sha512-IrPdXQsk2BbzvCBGBOTmmSH5SodmqZNt4ERAZDmW4CT+tL8VtvinqywuANaFu4bOMWki16nqf0e4oC0QIaDr/g==
+dset@^3.1.2:
+ version "3.1.3"
+ resolved "https://registry.yarnpkg.com/dset/-/dset-3.1.3.tgz#c194147f159841148e8e34ca41f638556d9542d2"
+ integrity sha512-20TuZZHCEZ2O71q9/+8BwKwZ0QtD9D8ObhrihJPr+vLLYlSuAU3/zL4cSlgbfeoGHTjCSJBa7NGcrF9/Bx/WJQ==
+
duplexer3@^0.1.4:
version "0.1.4"
resolved "https://registry.yarnpkg.com/duplexer3/-/duplexer3-0.1.4.tgz#ee01dd1cac0ed3cbc7fdbea37dc0a8f1ce002ce2"
@@ -18712,6 +18780,11 @@ tslib@~2.3.0:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01"
integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==
+tslib@~2.5.0:
+ version "2.5.3"
+ resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.5.3.tgz#24944ba2d990940e6e982c4bea147aba80209913"
+ integrity sha512-mSxlJJwl3BMEQCUNnxXBU9jP4JBktcEGhURcPR6VQVlnP0FdDEsIaz0C35dXNGLyRfrATNofF0F5p2KPxQgB+w==
+
tsutils@^3.17.1:
version "3.21.0"
resolved "https://registry.yarnpkg.com/tsutils/-/tsutils-3.21.0.tgz#b48717d394cea6c1e096983eed58e9d61715b623"
diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py
index a5af881022d8c..e88fc870cb333 100644
--- a/metadata-ingestion-modules/airflow-plugin/setup.py
+++ b/metadata-ingestion-modules/airflow-plugin/setup.py
@@ -101,6 +101,10 @@ def get_long_description():
f"acryl-datahub[testing-utils]{_self_pin}",
# Extra requirements for loading our test dags.
"apache-airflow[snowflake]>=2.0.2",
+ # Connexion's new version breaks Airflow:
+ # See https://github.com/apache/airflow/issues/35234.
+ # TODO: We should transition to using Airflow's constraints file.
+ "connexion<3",
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350
# Eventually we want to set this to "snowflake-sqlalchemy>=1.4.3".
# However, that doesn't work with older versions of Airflow. Instead
diff --git a/metadata-ingestion/adding-source.md b/metadata-ingestion/adding-source.md
index a0930102c6827..6baddf6b2010d 100644
--- a/metadata-ingestion/adding-source.md
+++ b/metadata-ingestion/adding-source.md
@@ -6,7 +6,7 @@ There are two ways of adding a metadata ingestion source.
2. You are writing the custom source for yourself and are not going to contribute back (yet).
If you are going for case (1) just follow the steps 1 to 9 below. In case you are building it for yourself you can skip
-steps 4-9 (but maybe write tests and docs for yourself as well) and follow the documentation
+steps 4-8 (but maybe write tests and docs for yourself as well) and follow the documentation
on [how to use custom ingestion sources](../docs/how/add-custom-ingestion-source.md)
without forking Datahub.
@@ -27,6 +27,7 @@ from `ConfigModel`. The [file source](./src/datahub/ingestion/source/file.py) is
We use [pydantic](https://pydantic-docs.helpmanual.io) conventions for documenting configuration flags. Use the `description` attribute to write rich documentation for your configuration field.
For example, the following code:
+
```python
from pydantic import Field
from datahub.api.configuration.common import ConfigModel
@@ -49,12 +50,10 @@ generates the following documentation:
-
:::note
Inline markdown or code snippets are not yet supported for field level documentation.
:::
-
### 2. Set up the reporter
The reporter interface enables the source to report statistics, warnings, failures, and other information about the run.
@@ -71,6 +70,8 @@ some [convenience methods](./src/datahub/emitter/mce_builder.py) for commonly us
### 4. Set up the dependencies
+Note: Steps 4-8 are only required if you intend to contribute the source back to the Datahub project.
+
Declare the source's pip dependencies in the `plugins` variable of the [setup script](./setup.py).
### 5. Enable discoverability
@@ -119,37 +120,38 @@ from datahub.ingestion.api.decorators import (
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default")
class FileSource(Source):
"""
-
- The File Source can be used to produce all kinds of metadata from a generic metadata events file.
+
+ The File Source can be used to produce all kinds of metadata from a generic metadata events file.
:::note
Events in this file can be in MCE form or MCP form.
:::
-
+
"""
... source code goes here
```
-
#### 7.2 Write custom documentation
-- Create a copy of [`source-docs-template.md`](./source-docs-template.md) and edit all relevant components.
+- Create a copy of [`source-docs-template.md`](./source-docs-template.md) and edit all relevant components.
- Name the document as `` and move it to `metadata-ingestion/docs/sources//.md`. For example for the Kafka platform, under the `kafka` plugin, move the document to `metadata-ingestion/docs/sources/kafka/kafka.md`.
- Add a quickstart recipe corresponding to the plugin under `metadata-ingestion/docs/sources//_recipe.yml`. For example, for the Kafka platform, under the `kafka` plugin, there is a quickstart recipe located at `metadata-ingestion/docs/sources/kafka/kafka_recipe.yml`.
- To write platform-specific documentation (that is cross-plugin), write the documentation under `metadata-ingestion/docs/sources//README.md`. For example, cross-plugin documentation for the BigQuery platform is located under `metadata-ingestion/docs/sources/bigquery/README.md`.
#### 7.3 Viewing the Documentation
-Documentation for the source can be viewed by running the documentation generator from the `docs-website` module.
+Documentation for the source can be viewed by running the documentation generator from the `docs-website` module.
##### Step 1: Build the Ingestion docs
+
```console
# From the root of DataHub repo
./gradlew :metadata-ingestion:docGen
```
If this finishes successfully, you will see output messages like:
+
```console
Ingestion Documentation Generation Complete
############################################
@@ -170,7 +172,8 @@ Ingestion Documentation Generation Complete
You can also find documentation files generated at `./docs/generated/ingestion/sources` relative to the root of the DataHub repo. You should be able to locate your specific source's markdown file here and investigate it to make sure things look as expected.
#### Step 2: Build the Entire Documentation
-To view how this documentation looks in the browser, there is one more step. Just build the entire docusaurus page from the `docs-website` module.
+
+To view how this documentation looks in the browser, there is one more step. Just build the entire docusaurus page from the `docs-website` module.
```console
# From the root of DataHub repo
@@ -178,6 +181,7 @@ To view how this documentation looks in the browser, there is one more step. Jus
```
This will generate messages like:
+
```console
...
> Task :docs-website:yarnGenerate
@@ -219,15 +223,15 @@ BUILD SUCCESSFUL in 35s
36 actionable tasks: 16 executed, 20 up-to-date
```
-After this you need to run the following script from the `docs-website` module.
+After this you need to run the following script from the `docs-website` module.
+
```console
cd docs-website
npm run serve
```
-Now, browse to http://localhost:3000 or whichever port npm is running on, to browse the docs.
-Your source should show up on the left sidebar under `Metadata Ingestion / Sources`.
-
+Now, browse to http://localhost:3000 or whichever port npm is running on, to browse the docs.
+Your source should show up on the left sidebar under `Metadata Ingestion / Sources`.
### 8. Add SQL Alchemy mapping (if applicable)
diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py
index 5e4427047104f..b390ffb9dd036 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py
@@ -93,10 +93,20 @@ def capability(
"""
def wrapper(cls: Type) -> Type:
- if not hasattr(cls, "__capabilities"):
+ if not hasattr(cls, "__capabilities") or any(
+ # It's from this class and not a superclass.
+ cls.__capabilities is getattr(base, "__capabilities", None)
+ for base in cls.__bases__
+ ):
cls.__capabilities = {}
cls.get_capabilities = lambda: cls.__capabilities.values()
+ # If the superclasses have capability annotations, copy those over.
+ for base in cls.__bases__:
+ base_caps = getattr(base, "__capabilities", None)
+ if base_caps:
+ cls.__capabilities.update(base_caps)
+
cls.__capabilities[capability_name] = CapabilitySetting(
capability=capability_name, description=description, supported=supported
)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py
index 96184d8d445e4..e4f1bb275487e 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py
@@ -1,9 +1,11 @@
import json
import logging
from datetime import datetime
-from typing import Dict, Iterable, Optional, Tuple
+from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar
from sqlalchemy import create_engine
+from sqlalchemy.engine import Row
+from typing_extensions import Protocol
from datahub.emitter.aspect import ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@@ -20,6 +22,62 @@
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
+class VersionOrderable(Protocol):
+ createdon: Any # Should restrict to only orderable types
+ version: int
+
+
+ROW = TypeVar("ROW", bound=VersionOrderable)
+
+
+class VersionOrderer(Generic[ROW]):
+ """Orders rows by (createdon, version == 0).
+
+ That is, orders rows first by createdon, and for equal timestamps, puts version 0 rows last.
+ """
+
+ def __init__(self, enabled: bool):
+ # Stores all version 0 aspects for a given createdon timestamp
+ # Once we have emitted all aspects for a given timestamp, we can emit the version 0 aspects
+ # Guaranteeing that, for a given timestamp, we always ingest version 0 aspects last
+ self.queue: Optional[Tuple[datetime, List[ROW]]] = None
+ self.enabled = enabled
+
+ def __call__(self, rows: Iterable[ROW]) -> Iterable[ROW]:
+ for row in rows:
+ yield from self._process_row(row)
+ yield from self._flush_queue()
+
+ def _process_row(self, row: ROW) -> Iterable[ROW]:
+ if not self.enabled:
+ yield row
+ return
+
+ yield from self._attempt_queue_flush(row)
+ if row.version == 0:
+ self._add_to_queue(row)
+ else:
+ yield row
+
+ def _add_to_queue(self, row: ROW) -> None:
+ if self.queue is None:
+ self.queue = (row.createdon, [row])
+ else:
+ self.queue[1].append(row)
+
+ def _attempt_queue_flush(self, row: ROW) -> Iterable[ROW]:
+ if self.queue is None:
+ return
+
+ if row.createdon > self.queue[0]:
+ yield from self._flush_queue()
+
+ def _flush_queue(self) -> Iterable[ROW]:
+ if self.queue is not None:
+ yield from self.queue[1]
+ self.queue = None
+
+
class DataHubDatabaseReader:
def __init__(
self,
@@ -40,13 +98,14 @@ def query(self) -> str:
# Offset is generally 0, unless we repeat the same createdon twice
# Ensures stable order, chronological per (urn, aspect)
- # Version 0 last, only when createdon is the same. Otherwise relies on createdon order
+ # Relies on createdon order to reflect version order
+ # Ordering of entries with the same createdon is handled by VersionOrderer
return f"""
- SELECT urn, aspect, metadata, systemmetadata, createdon
+ SELECT urn, aspect, metadata, systemmetadata, createdon, version
FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)}
WHERE createdon >= %(since_createdon)s
{"" if self.config.include_all_versions else "AND version = 0"}
- ORDER BY createdon, urn, aspect, CASE WHEN version = 0 THEN 1 ELSE 0 END, version
+ ORDER BY createdon, urn, aspect, version
LIMIT %(limit)s
OFFSET %(offset)s
"""
@@ -54,6 +113,14 @@ def query(self) -> str:
def get_aspects(
self, from_createdon: datetime, stop_time: datetime
) -> Iterable[Tuple[MetadataChangeProposalWrapper, datetime]]:
+ orderer = VersionOrderer[Row](enabled=self.config.include_all_versions)
+ rows = self._get_rows(from_createdon=from_createdon, stop_time=stop_time)
+ for row in orderer(rows):
+ mcp = self._parse_row(row)
+ if mcp:
+ yield mcp, row.createdon
+
+ def _get_rows(self, from_createdon: datetime, stop_time: datetime) -> Iterable[Row]:
with self.engine.connect() as conn:
ts = from_createdon
offset = 0
@@ -69,34 +136,31 @@ def get_aspects(
return
for i, row in enumerate(rows):
- row_dict = row._asdict()
- mcp = self._parse_row(row_dict)
- if mcp:
- yield mcp, row_dict["createdon"]
+ yield row
- if ts == row_dict["createdon"]:
- offset += i
+ if ts == row.createdon:
+ offset += i + 1
else:
- ts = row_dict["createdon"]
+ ts = row.createdon
offset = 0
- def _parse_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]:
+ def _parse_row(self, row: Row) -> Optional[MetadataChangeProposalWrapper]:
try:
- json_aspect = post_json_transform(json.loads(d["metadata"]))
- json_metadata = post_json_transform(json.loads(d["systemmetadata"] or "{}"))
+ json_aspect = post_json_transform(json.loads(row.metadata))
+ json_metadata = post_json_transform(json.loads(row.systemmetadata or "{}"))
system_metadata = SystemMetadataClass.from_obj(json_metadata)
return MetadataChangeProposalWrapper(
- entityUrn=d["urn"],
- aspect=ASPECT_MAP[d["aspect"]].from_obj(json_aspect),
+ entityUrn=row.urn,
+ aspect=ASPECT_MAP[row.aspect].from_obj(json_aspect),
systemMetadata=system_metadata,
changeType=ChangeTypeClass.UPSERT,
)
except Exception as e:
logger.warning(
- f"Failed to parse metadata for {d['urn']}: {e}", exc_info=True
+ f"Failed to parse metadata for {row.urn}: {e}", exc_info=True
)
self.report.num_database_parse_errors += 1
self.report.database_parse_errors.setdefault(
str(e), LossyDict()
- ).setdefault(d["aspect"], LossyList()).append(d["urn"])
+ ).setdefault(row.aspect, LossyList()).append(row.urn)
return None
diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py
index 7fb2cf9813cab..d11b1f9ad6a53 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py
@@ -15,11 +15,12 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.api.decorators import capability
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
JobId,
)
-from datahub.ingestion.api.source import Source, SourceReport
+from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.source.state.checkpoint import Checkpoint, StateType
from datahub.ingestion.source.state.use_case_handler import (
StatefulIngestionUsecaseHandlerBase,
@@ -177,6 +178,11 @@ class StatefulIngestionReport(SourceReport):
pass
+@capability(
+ SourceCapability.DELETION_DETECTION,
+ "Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
+ supported=True,
+)
class StatefulIngestionSourceBase(Source):
"""
Defines the base class for all stateful sources.
diff --git a/metadata-ingestion/tests/unit/test_datahub_source.py b/metadata-ingestion/tests/unit/test_datahub_source.py
new file mode 100644
index 0000000000000..adc131362b326
--- /dev/null
+++ b/metadata-ingestion/tests/unit/test_datahub_source.py
@@ -0,0 +1,51 @@
+from dataclasses import dataclass
+
+import pytest
+
+from datahub.ingestion.source.datahub.datahub_database_reader import (
+ VersionOrderable,
+ VersionOrderer,
+)
+
+
+@dataclass
+class MockRow(VersionOrderable):
+ createdon: int
+ version: int
+ urn: str
+
+
+@pytest.fixture
+def rows():
+ return [
+ MockRow(0, 0, "one"),
+ MockRow(0, 1, "one"),
+ MockRow(0, 0, "two"),
+ MockRow(0, 0, "three"),
+ MockRow(0, 1, "three"),
+ MockRow(0, 2, "three"),
+ MockRow(0, 1, "two"),
+ MockRow(0, 4, "three"),
+ MockRow(0, 5, "three"),
+ MockRow(1, 6, "three"),
+ MockRow(1, 0, "four"),
+ MockRow(2, 0, "five"),
+ MockRow(2, 1, "six"),
+ MockRow(2, 0, "six"),
+ MockRow(3, 0, "seven"),
+ MockRow(3, 0, "eight"),
+ ]
+
+
+def test_version_orderer(rows):
+ orderer = VersionOrderer[MockRow](enabled=True)
+ ordered_rows = list(orderer(rows))
+ assert ordered_rows == sorted(
+ ordered_rows, key=lambda x: (x.createdon, x.version == 0)
+ )
+
+
+def test_version_orderer_disabled(rows):
+ orderer = VersionOrderer[MockRow](enabled=False)
+ ordered_rows = list(orderer(rows))
+ assert ordered_rows == rows
diff --git a/metadata-service/restli-api/build.gradle b/metadata-service/restli-api/build.gradle
index ed4f4118dba30..f182d11b6baeb 100644
--- a/metadata-service/restli-api/build.gradle
+++ b/metadata-service/restli-api/build.gradle
@@ -8,4 +8,10 @@ dependencies {
restClientCompile spec.product.pegasus.d2
restClientCompile spec.product.pegasus.restliClient
+
+ constraints {
+ restClientCompile(externalDependency.zookeeper) {
+ because("CVE-2023-44981")
+ }
+ }
}
\ No newline at end of file