diff --git a/datahub-frontend/app/controllers/Application.java b/datahub-frontend/app/controllers/Application.java index d31cb13fa7b41..5c76f2572a936 100644 --- a/datahub-frontend/app/controllers/Application.java +++ b/datahub-frontend/app/controllers/Application.java @@ -136,11 +136,11 @@ public CompletableFuture proxy(String path, Http.Request request) throws .stream() // Remove X-DataHub-Actor to prevent malicious delegation. .filter(entry -> !AuthenticationConstants.LEGACY_X_DATAHUB_ACTOR_HEADER.equalsIgnoreCase(entry.getKey())) - .filter(entry -> !Http.HeaderNames.CONTENT_LENGTH.equals(entry.getKey())) - .filter(entry -> !Http.HeaderNames.CONTENT_TYPE.equals(entry.getKey())) - .filter(entry -> !Http.HeaderNames.AUTHORIZATION.equals(entry.getKey())) + .filter(entry -> !Http.HeaderNames.CONTENT_LENGTH.equalsIgnoreCase(entry.getKey())) + .filter(entry -> !Http.HeaderNames.CONTENT_TYPE.equalsIgnoreCase(entry.getKey())) + .filter(entry -> !Http.HeaderNames.AUTHORIZATION.equalsIgnoreCase(entry.getKey())) // Remove Host s.th. service meshes do not route to wrong host - .filter(entry -> !Http.HeaderNames.HOST.equals(entry.getKey())) + .filter(entry -> !Http.HeaderNames.HOST.equalsIgnoreCase(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) ) .addHeader(Http.HeaderNames.AUTHORIZATION, authorizationHeaderValue) @@ -152,8 +152,8 @@ public CompletableFuture proxy(String path, Http.Request request) throws final ResponseHeader header = new ResponseHeader(apiResponse.getStatus(), apiResponse.getHeaders() .entrySet() .stream() - .filter(entry -> !Http.HeaderNames.CONTENT_LENGTH.equals(entry.getKey())) - .filter(entry -> !Http.HeaderNames.CONTENT_TYPE.equals(entry.getKey())) + .filter(entry -> !Http.HeaderNames.CONTENT_LENGTH.equalsIgnoreCase(entry.getKey())) + .filter(entry -> !Http.HeaderNames.CONTENT_TYPE.equalsIgnoreCase(entry.getKey())) .map(entry -> Pair.of(entry.getKey(), String.join(";", entry.getValue()))) .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond))); final HttpEntity body = new HttpEntity.Strict(apiResponse.getBodyAsBytes(), Optional.ofNullable(apiResponse.getContentType())); diff --git a/datahub-web-react/src/App.tsx b/datahub-web-react/src/App.tsx index 1d9c9cbd27dbb..68a4b93d71481 100644 --- a/datahub-web-react/src/App.tsx +++ b/datahub-web-react/src/App.tsx @@ -48,7 +48,8 @@ const errorLink = onError((error) => { if (serverError.statusCode === 401) { isLoggedInVar(false); Cookies.remove(GlobalCfg.CLIENT_AUTH_COOKIE); - window.location.replace(PageRoutes.AUTHENTICATE); + const currentPath = window.location.pathname + window.location.search; + window.location.replace(`${PageRoutes.AUTHENTICATE}?redirect_uri=${encodeURIComponent(currentPath)}`); } } if (graphQLErrors && graphQLErrors.length) { diff --git a/datahub-web-react/src/app/context/UserContextProvider.tsx b/datahub-web-react/src/app/context/UserContextProvider.tsx index 4a9f24cdea30e..3bcff15cc2748 100644 --- a/datahub-web-react/src/app/context/UserContextProvider.tsx +++ b/datahub-web-react/src/app/context/UserContextProvider.tsx @@ -3,7 +3,6 @@ import { useGetMeLazyQuery } from '../../graphql/me.generated'; import { useGetGlobalViewsSettingsLazyQuery } from '../../graphql/app.generated'; import { CorpUser, PlatformPrivileges } from '../../types.generated'; import { UserContext, LocalState, DEFAULT_STATE, State } from './userContext'; -import { useInitialRedirect } from './useInitialRedirect'; // TODO: Migrate all usage of useAuthenticatedUser to using this provider. @@ -125,11 +124,6 @@ const UserContextProvider = ({ children }: { children: React.ReactNode }) => { } }, [state, localState.selectedViewUrn, setDefaultSelectedView]); - /** - * Route to the most recently visited path once on first load of home page, if present in local storage. - */ - useInitialRedirect(state, localState, setState, updateLocalState); - return ( { - if (!state.loadedInitialPath) { - if (location.pathname === PageRoutes.ROOT && localState.selectedPath !== location.pathname) { - if (localState.selectedPath && !localState.selectedPath.includes(PageRoutes.EMBED)) { - history.replace({ - pathname: localState.selectedPath, - search: localState.selectedSearch || '', - }); - } - } - setState({ - ...state, - loadedInitialPath: true, - }); - } - }, [ - localState.selectedPath, - localState.selectedSearch, - location.pathname, - location.search, - state, - history, - setState, - ]); - - /** - * When the location of the browse changes, save the latest to local state. - */ - useEffect(() => { - if ( - (localState.selectedPath !== location.pathname || localState.selectedSearch !== location.search) && - !location.pathname.includes(PageRoutes.EMBED) - ) { - setLocalState({ - ...localState, - selectedPath: location.pathname, - selectedSearch: location.search, - }); - } - }, [location.pathname, location.search, localState, setLocalState]); -} diff --git a/datahub-web-react/src/app/context/userContext.tsx b/datahub-web-react/src/app/context/userContext.tsx index b6cbe6250e4ed..2e174d38247f5 100644 --- a/datahub-web-react/src/app/context/userContext.tsx +++ b/datahub-web-react/src/app/context/userContext.tsx @@ -22,10 +22,6 @@ export type State = { loadedPersonalDefaultViewUrn: boolean; hasSetDefaultView: boolean; }; - /** - * Whether the initial page path has been loaded. - */ - loadedInitialPath: boolean; }; /** @@ -54,7 +50,6 @@ export const DEFAULT_STATE: State = { loadedPersonalDefaultViewUrn: false, hasSetDefaultView: false, }, - loadedInitialPath: false, }; export const DEFAULT_CONTEXT = { diff --git a/datahub-web-react/src/app/entity/group/AddGroupMembersModal.tsx b/datahub-web-react/src/app/entity/group/AddGroupMembersModal.tsx index 25637fe91137a..45287e823523b 100644 --- a/datahub-web-react/src/app/entity/group/AddGroupMembersModal.tsx +++ b/datahub-web-react/src/app/entity/group/AddGroupMembersModal.tsx @@ -83,9 +83,9 @@ export const AddGroupMembersModal = ({ urn, visible, onCloseModal, onSubmit }: P setSelectedMembers(newUsers); }; - const onDeselectMember = (memberUrn: string) => { + const onDeselectMember = (memberUrn: { key: string; label: React.ReactNode; value: string }) => { setInputValue(''); - const newUserActors = selectedMembers.filter((user) => user !== memberUrn); + const newUserActors = selectedMembers.filter((user) => user.value !== memberUrn.value); setSelectedMembers(newUserActors); }; diff --git a/datahub-web-react/src/app/entity/shared/__tests__/siblingsUtils.test.ts b/datahub-web-react/src/app/entity/shared/__tests__/siblingsUtils.test.ts index a15c4bc20ef29..6e23d5400ab77 100644 --- a/datahub-web-react/src/app/entity/shared/__tests__/siblingsUtils.test.ts +++ b/datahub-web-react/src/app/entity/shared/__tests__/siblingsUtils.test.ts @@ -195,6 +195,7 @@ const searchResultWithSiblings = [ { entity: { urn: 'urn:li:dataset:(urn:li:dataPlatform:bigquery,cypress_project.jaffle_shop.raw_orders,PROD)', + exists: true, type: 'DATASET', name: 'cypress_project.jaffle_shop.raw_orders', origin: 'PROD', @@ -328,6 +329,7 @@ const searchResultWithSiblings = [ siblings: [ { urn: 'urn:li:dataset:(urn:li:dataPlatform:dbt,cypress_project.jaffle_shop.raw_orders,PROD)', + exists: true, type: 'DATASET', platform: { urn: 'urn:li:dataPlatform:dbt', @@ -376,6 +378,7 @@ const searchResultWithSiblings = [ { entity: { urn: 'urn:li:dataset:(urn:li:dataPlatform:dbt,cypress_project.jaffle_shop.raw_orders,PROD)', + exists: true, type: 'DATASET', name: 'cypress_project.jaffle_shop.raw_orders', origin: 'PROD', @@ -513,6 +516,169 @@ const searchResultWithSiblings = [ }, ]; +const searchResultWithGhostSiblings = [ + { + entity: { + urn: 'urn:li:dataset:(urn:li:dataPlatform:bigquery,cypress_project.jaffle_shop.raw_orders,PROD)', + exists: true, + type: 'DATASET', + name: 'cypress_project.jaffle_shop.raw_orders', + origin: 'PROD', + uri: null, + platform: { + urn: 'urn:li:dataPlatform:bigquery', + type: 'DATA_PLATFORM', + name: 'bigquery', + properties: { + type: 'RELATIONAL_DB', + displayName: 'BigQuery', + datasetNameDelimiter: '.', + logoUrl: '/assets/platforms/bigquerylogo.png', + __typename: 'DataPlatformProperties', + }, + displayName: null, + info: null, + __typename: 'DataPlatform', + }, + dataPlatformInstance: null, + editableProperties: null, + platformNativeType: null, + properties: { + name: 'raw_orders', + description: null, + qualifiedName: null, + customProperties: [], + __typename: 'DatasetProperties', + }, + ownership: null, + globalTags: null, + glossaryTerms: null, + subTypes: { + typeNames: ['table'], + __typename: 'SubTypes', + }, + domain: null, + container: { + urn: 'urn:li:container:348c96555971d3f5c1ffd7dd2e7446cb', + platform: { + urn: 'urn:li:dataPlatform:bigquery', + type: 'DATA_PLATFORM', + name: 'bigquery', + properties: { + type: 'RELATIONAL_DB', + displayName: 'BigQuery', + datasetNameDelimiter: '.', + logoUrl: '/assets/platforms/bigquerylogo.png', + __typename: 'DataPlatformProperties', + }, + displayName: null, + info: null, + __typename: 'DataPlatform', + }, + properties: { + name: 'jaffle_shop', + __typename: 'ContainerProperties', + }, + subTypes: { + typeNames: ['Dataset'], + __typename: 'SubTypes', + }, + deprecation: null, + __typename: 'Container', + }, + parentContainers: { + count: 2, + containers: [ + { + urn: 'urn:li:container:348c96555971d3f5c1ffd7dd2e7446cb', + platform: { + urn: 'urn:li:dataPlatform:bigquery', + type: 'DATA_PLATFORM', + name: 'bigquery', + properties: { + type: 'RELATIONAL_DB', + displayName: 'BigQuery', + datasetNameDelimiter: '.', + logoUrl: '/assets/platforms/bigquerylogo.png', + __typename: 'DataPlatformProperties', + }, + displayName: null, + info: null, + __typename: 'DataPlatform', + }, + properties: { + name: 'jaffle_shop', + __typename: 'ContainerProperties', + }, + subTypes: { + typeNames: ['Dataset'], + __typename: 'SubTypes', + }, + deprecation: null, + __typename: 'Container', + }, + { + urn: 'urn:li:container:b5e95fce839e7d78151ed7e0a7420d84', + platform: { + urn: 'urn:li:dataPlatform:bigquery', + type: 'DATA_PLATFORM', + name: 'bigquery', + properties: { + type: 'RELATIONAL_DB', + displayName: 'BigQuery', + datasetNameDelimiter: '.', + logoUrl: '/assets/platforms/bigquerylogo.png', + __typename: 'DataPlatformProperties', + }, + displayName: null, + info: null, + __typename: 'DataPlatform', + }, + properties: { + name: 'cypress_project', + __typename: 'ContainerProperties', + }, + subTypes: { + typeNames: ['Project'], + __typename: 'SubTypes', + }, + deprecation: null, + __typename: 'Container', + }, + ], + __typename: 'ParentContainersResult', + }, + deprecation: null, + siblings: { + isPrimary: false, + siblings: [ + { + urn: 'urn:li:dataset:(urn:li:dataPlatform:dbt,cypress_project.jaffle_shop.raw_orders,PROD)', + exists: false, + type: 'DATASET', + }, + ], + __typename: 'SiblingProperties', + }, + __typename: 'Dataset', + }, + matchedFields: [ + { + name: 'name', + value: 'raw_orders', + __typename: 'MatchedField', + }, + { + name: 'id', + value: 'cypress_project.jaffle_shop.raw_orders', + __typename: 'MatchedField', + }, + ], + insights: [], + __typename: 'SearchResult', + }, +]; + describe('siblingUtils', () => { describe('combineEntityDataWithSiblings', () => { it('combines my metadata with my siblings as primary', () => { @@ -564,6 +730,18 @@ describe('siblingUtils', () => { expect(result?.[0]?.matchedEntities?.[1]?.urn).toEqual( 'urn:li:dataset:(urn:li:dataPlatform:bigquery,cypress_project.jaffle_shop.raw_orders,PROD)', ); + + expect(result?.[0]?.matchedEntities).toHaveLength(2); + }); + + it('will not combine an entity with a ghost node', () => { + const result = combineSiblingsInSearchResults(searchResultWithGhostSiblings as any); + + expect(result).toHaveLength(1); + expect(result?.[0]?.matchedEntities?.[0]?.urn).toEqual( + 'urn:li:dataset:(urn:li:dataPlatform:bigquery,cypress_project.jaffle_shop.raw_orders,PROD)', + ); + expect(result?.[0]?.matchedEntities).toHaveLength(1); }); }); diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/SidebarSiblingsSection.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/SidebarSiblingsSection.tsx index 4b0089e6b9214..4ea1ab69e44b3 100644 --- a/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/SidebarSiblingsSection.tsx +++ b/datahub-web-react/src/app/entity/shared/containers/profile/sidebar/SidebarSiblingsSection.tsx @@ -4,7 +4,7 @@ import styled from 'styled-components'; import { useDataNotCombinedWithSiblings, useEntityData } from '../../../EntityContext'; import { SidebarHeader } from './SidebarHeader'; import { CompactEntityNameList } from '../../../../../recommendations/renderer/component/CompactEntityNameList'; -import { Entity } from '../../../../../../types.generated'; +import { Dataset, Entity } from '../../../../../../types.generated'; import { SEPARATE_SIBLINGS_URL_PARAM, stripSiblingsFromEntity, useIsSeparateSiblingsMode } from '../../../siblingUtils'; import { GetDatasetQuery } from '../../../../../../graphql/dataset.generated'; @@ -36,14 +36,22 @@ export const SidebarSiblingsSection = () => { const siblingEntities = entityData?.siblings?.siblings || []; const entityDataWithoutSiblings = stripSiblingsFromEntity(dataNotCombinedWithSiblings.dataset); - const allSiblingsInGroup = [...siblingEntities, entityDataWithoutSiblings] as Entity[]; + const allSiblingsInGroup = [...siblingEntities, entityDataWithoutSiblings] as Dataset[]; + + const allSiblingsInGroupThatExist = allSiblingsInGroup.filter((sibling) => sibling.exists); + + // you are always going to be in the sibling group, so if the sibling group is just you do not render. + // The less than case is likely not neccessary but just there as a safety case for unexpected scenarios + if (allSiblingsInGroupThatExist.length <= 1) { + return <>; + } return (
diff --git a/datahub-web-react/src/app/entity/shared/siblingUtils.ts b/datahub-web-react/src/app/entity/shared/siblingUtils.ts index 47687e4050b9b..2cad28d754a80 100644 --- a/datahub-web-react/src/app/entity/shared/siblingUtils.ts +++ b/datahub-web-react/src/app/entity/shared/siblingUtils.ts @@ -2,7 +2,7 @@ import merge from 'deepmerge'; import { unionBy, keyBy, values } from 'lodash'; import { useLocation } from 'react-router-dom'; import * as QueryString from 'query-string'; -import { Entity, MatchedField, Maybe, SiblingProperties } from '../../../types.generated'; +import { Dataset, Entity, MatchedField, Maybe, SiblingProperties } from '../../../types.generated'; export function stripSiblingsFromEntity(entity: any) { return { @@ -235,6 +235,11 @@ export function combineSiblingsInSearchResults( combinedResult.matchedEntities = entity.siblings.isPrimary ? [stripSiblingsFromEntity(entity), ...entity.siblings.siblings] : [...entity.siblings.siblings, stripSiblingsFromEntity(entity)]; + + combinedResult.matchedEntities = combinedResult.matchedEntities.filter( + (resultToFilter) => (resultToFilter as Dataset).exists, + ); + siblingUrns.forEach((urn) => { siblingsToPair[urn] = combinedResult; }); diff --git a/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts b/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts index 505b3d94531b7..4dd54ea25416d 100644 --- a/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts +++ b/datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts @@ -138,7 +138,10 @@ export function filterColumns( node: { x: number; y: number; data: Omit }, setColumnsByUrn: (value: React.SetStateAction>) => void, ) { - const filteredFields = node.data.schemaMetadata?.fields.filter((field) => field.fieldPath.includes(filterText)); + const formattedFilterText = filterText.toLocaleLowerCase(); + const filteredFields = node.data.schemaMetadata?.fields.filter((field) => + field.fieldPath.toLocaleLowerCase().includes(formattedFilterText), + ); if (filteredFields) { setColumnsByUrn((colsByUrn) => ({ ...colsByUrn, diff --git a/datahub-web-react/src/app/search/SearchResultList.tsx b/datahub-web-react/src/app/search/SearchResultList.tsx index a793348db6f1d..b860e7b670c33 100644 --- a/datahub-web-react/src/app/search/SearchResultList.tsx +++ b/datahub-web-react/src/app/search/SearchResultList.tsx @@ -151,7 +151,9 @@ export const SearchResultList = ({ )} {entityRegistry.renderSearchResult(item.entity.type, item)} - {item.matchedEntities && item.matchedEntities.length > 0 && ( + {/* an entity is always going to be inserted in the sibling group, so if the sibling group is just one do not + render. */} + {item.matchedEntities && item.matchedEntities.length > 1 && ( { {/* HeroAnnouncement goes here */}
-

The #1 Open Source Data Catalog

+

The #1 Open Source Metadata Platform

- DataHub's extensible metadata platform enables data discovery, data observability and federated governance that helps tame the + DataHub is an extensible metadata platform that enables data discovery, data observability and federated governance to help tame the complexity of your data ecosystem.

+

+Built with ❤️ by Acryl Data and LinkedIn. +

Get Started → diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5498c96d91af6..d6738f4b9782c 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -401,7 +401,9 @@ def get_long_description(): "types-cachetools", # versions 0.1.13 and 0.1.14 seem to have issues "types-click==0.1.12", - "boto3-stubs[s3,glue,sagemaker,sts]>=1.28.4", + # The boto3-stubs package seems to have regularly breaking minor releases, + # we pin to a specific version to avoid this. + "boto3-stubs[s3,glue,sagemaker,sts]==1.28.15", "types-tabulate", # avrogen package requires this "types-pytz", diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py index 75fae47c966fd..b8b96c6306a3b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py @@ -24,7 +24,7 @@ from mypy_boto3_sagemaker import SageMakerClient from mypy_boto3_sagemaker.type_defs import ( DescribeFeatureGroupResponseTypeDef, - FeatureDefinitionOutputTypeDef, + FeatureDefinitionTypeDef, FeatureGroupSummaryTypeDef, ) @@ -147,7 +147,7 @@ def get_feature_type(self, aws_type: str, feature_name: str) -> str: def get_feature_wu( self, feature_group_details: "DescribeFeatureGroupResponseTypeDef", - feature: "FeatureDefinitionOutputTypeDef", + feature: "FeatureDefinitionTypeDef", ) -> MetadataWorkUnit: """ Generate an MLFeature workunit for a SageMaker feature. diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index e7c60435e13f8..32666ceecdf85 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -252,11 +252,11 @@ def sanitize_owner_ids(self, owner_id: str) -> str: def get_match(self, match_clause: Any, raw_props_value: Any) -> Optional[Match]: # function to check if a match clause is satisfied to a value. - if type(raw_props_value) not in Constants.OPERAND_DATATYPE_SUPPORTED or type( - raw_props_value - ) != type(match_clause): + if not any( + isinstance(raw_props_value, t) for t in Constants.OPERAND_DATATYPE_SUPPORTED + ) or not isinstance(raw_props_value, type(match_clause)): return None - elif type(raw_props_value) == str: + elif isinstance(raw_props_value, str): return re.match(match_clause, raw_props_value) else: return re.match(str(match_clause), str(raw_props_value)) diff --git a/metadata-ingestion/src/datahub_provider/_plugin.py b/metadata-ingestion/src/datahub_provider/_plugin.py index b4ac5a36c6eae..6f6c7c9ab71b7 100644 --- a/metadata-ingestion/src/datahub_provider/_plugin.py +++ b/metadata-ingestion/src/datahub_provider/_plugin.py @@ -107,7 +107,7 @@ def get_inlets_from_task(task: BaseOperator, context: Any) -> Iterable[Any]: ] for inlet in task_inlets: - if type(inlet) != str: + if isinstance(inlet, str): inlets.append(inlet) return inlets diff --git a/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_operator.py b/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_operator.py index 89a037324e7cb..28be8ad860179 100644 --- a/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_operator.py +++ b/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_operator.py @@ -62,9 +62,9 @@ def execute(self, context: Any) -> bool: return True self.log.info(f"Checking if dataset {self.urn} is ready to be consumed") - if type(self.urn) == str: + if isinstance(self.urn, str): urns = [self.urn] - elif type(self.urn) == list: + elif isinstance(self.urn, list): urns = self.urn else: raise Exception(f"urn parameter has invalid type {type(self.urn)}") diff --git a/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_sensor.py b/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_sensor.py index 55a3492f9c8d6..ceb970dd8dc7f 100644 --- a/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_sensor.py +++ b/metadata-ingestion/src/datahub_provider/operators/datahub_assertion_sensor.py @@ -61,9 +61,9 @@ def poke(self, context: Any) -> bool: return True self.log.info(f"Checking if dataset {self.urn} is ready to be consumed") - if type(self.urn) == str: + if isinstance(self.urn, str): urns = [self.urn] - elif type(self.urn) == list: + elif isinstance(self.urn, list): urns = self.urn else: raise Exception(f"urn parameter has invalid type {type(self.urn)}") diff --git a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py index e5e45c2bf4694..6b2535994c101 100644 --- a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py +++ b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py @@ -76,9 +76,9 @@ def execute(self, context: Any) -> bool: return True self.log.info(f"Checking if dataset {self.urn} is ready to be consumed") - if type(self.urn) == str: + if isinstance(self.urn, str): urns = [self.urn] - elif type(self.urn) == list: + elif isinstance(self.urn, list): urns = self.urn else: raise Exception(f"urn parameter has invalid type {type(self.urn)}") diff --git a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_sensor.py b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_sensor.py index 31b387a7e65b0..8796215453500 100644 --- a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_sensor.py +++ b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_sensor.py @@ -78,9 +78,9 @@ def poke(self, context: Any) -> bool: return True self.log.info(f"Checking if dataset {self.urn} is ready to be consumed") - if type(self.urn) == str: + if isinstance(self.urn, str): urns = [self.urn] - elif type(self.urn) == list: + elif isinstance(self.urn, list): urns = self.urn else: raise Exception(f"urn parameter has invalid type {type(self.urn)}") diff --git a/metadata-ingestion/tests/unit/test_glue_source.py b/metadata-ingestion/tests/unit/test_glue_source.py index 23dc3b97e09b5..8fb840ee003c7 100644 --- a/metadata-ingestion/tests/unit/test_glue_source.py +++ b/metadata-ingestion/tests/unit/test_glue_source.py @@ -89,7 +89,7 @@ def test_column_type(hive_column_type: str, expected_type: Type) -> None: ) schema_fields = avro_schema_to_mce_fields(json.dumps(avro_schema)) actual_schema_field_type = schema_fields[0].type - assert type(actual_schema_field_type.type) == expected_type + assert isinstance(actual_schema_field_type.type, expected_type) @pytest.mark.parametrize( diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index 33496b868adb7..b48ebf12ee37a 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -132,7 +132,9 @@ def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_cl # DataPlatform aspect should be present when platform_instance is configured data_platform_aspects = [ - asp for asp in proposed_snap.aspects if type(asp) == DataPlatformInstanceClass + asp + for asp in proposed_snap.aspects + if isinstance(asp, DataPlatformInstanceClass) ] assert len(data_platform_aspects) == 1 assert data_platform_aspects[0].instance == make_dataplatform_instance_urn( @@ -141,7 +143,7 @@ def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_cl # The default browse path should include the platform_instance value browse_path_aspects = [ - asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass + asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass) ] assert len(browse_path_aspects) == 1 assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths @@ -177,13 +179,15 @@ def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_clie # DataPlatform aspect should not be present when platform_instance is not configured data_platform_aspects = [ - asp for asp in proposed_snap.aspects if type(asp) == DataPlatformInstanceClass + asp + for asp in proposed_snap.aspects + if isinstance(asp, DataPlatformInstanceClass) ] assert len(data_platform_aspects) == 0 # The default browse path should include the platform_instance value browse_path_aspects = [ - asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass + asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass) ] assert len(browse_path_aspects) == 1 assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths diff --git a/metadata-ingestion/tests/unit/utilities/test_file_backed_collections.py b/metadata-ingestion/tests/unit/utilities/test_file_backed_collections.py index 582e3814c4f6d..2d7556eb88341 100644 --- a/metadata-ingestion/tests/unit/utilities/test_file_backed_collections.py +++ b/metadata-ingestion/tests/unit/utilities/test_file_backed_collections.py @@ -256,7 +256,7 @@ def test_shared_connection() -> None: iterator = cache2.sql_query_iterator( f"SELECT y, sum(x) FROM {cache2.tablename} GROUP BY y ORDER BY y" ) - assert type(iterator) == sqlite3.Cursor + assert isinstance(iterator, sqlite3.Cursor) assert [tuple(r) for r in iterator] == [("a", 15), ("b", 11)] # Test joining between the two tables. diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java index bf74b1025267f..9967df9207ec7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java @@ -3,6 +3,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import com.linkedin.metadata.utils.metrics.MetricUtils; import io.ebean.PagedList; import javax.annotation.Nonnull; @@ -28,6 +29,8 @@ * worth looking into ways to move this responsibility inside {@link AspectDao} implementations. */ public interface AspectDao { + String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount"; + String ASPECT_WRITE_BYTES_METRIC_NAME = "aspectWriteBytes"; @Nullable EntityAspect getAspect(@Nonnull final String urn, @Nonnull final String aspectName, final long version); @@ -116,4 +119,11 @@ ListResult listAspectMetadata( @Nonnull T runInTransactionWithRetry(@Nonnull final Supplier block, final int maxTransactionRetry); + + default void incrementWriteMetrics(String aspectName, long count, long bytes) { + MetricUtils.counter(this.getClass(), + String.join(MetricUtils.DELIMITER, List.of(ASPECT_WRITE_COUNT_METRIC_NAME, aspectName))).inc(count); + MetricUtils.counter(this.getClass(), + String.join(MetricUtils.DELIMITER, List.of(ASPECT_WRITE_BYTES_METRIC_NAME, aspectName))).inc(bytes); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index c7a9895992d90..e070944b49a05 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1811,6 +1811,8 @@ public RollbackResult deleteAspect(String urn, String aspectName, @Nonnull Map deleteByQuery(QueryBuilder queryBuilder, b deleteByQueryRequest.indices(indices); try { - // flush pending writes - bulkProcessor.flush(); + if (!batchDelete) { + // flush pending writes + bulkProcessor.flush(); + } // perform delete after local flush final BulkByScrollResponse deleteResponse = searchClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc(deleteResponse.getTotal()); diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 18f042b65d0b1..5290dd4adceac 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -168,6 +168,7 @@ elasticsearch: numRetries: ${ES_BULK_NUM_RETRIES:3} retryInterval: ${ES_BULK_RETRY_INTERVAL:1} refreshPolicy: ${ES_BULK_REFRESH_POLICY:NONE} + enableBatchDelete: ${ES_BULK_ENABLE_BATCH_DELETE:false} index: prefix: ${INDEX_PREFIX:} numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:1} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java index 60bb89cf3c589..956157f70e6bc 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java @@ -41,6 +41,9 @@ public class ElasticSearchBulkProcessorFactory { @Value("#{new Boolean('${elasticsearch.bulkProcessor.async}')}") private boolean async; + @Value("#{new Boolean('${elasticsearch.bulkProcessor.enableBatchDelete}')}") + private boolean enableBatchDelete; + @Value("${elasticsearch.bulkProcessor.refreshPolicy}") private String refreshPolicy; @@ -53,6 +56,7 @@ protected ESBulkProcessor getInstance() { .bulkRequestsLimit(bulkRequestsLimit) .retryInterval(retryInterval) .numRetries(numRetries) + .batchDelete(enableBatchDelete) .writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.valueOf(refreshPolicy)) .build(); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java index e3f3a9c5b76a5..11d12072e12b7 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java @@ -108,6 +108,10 @@ public void checkSystemVersion(final ConsumerRecord consu } public void waitForUpdate() { + if (!_configurationProvider.getSystemUpdate().isWaitForSystemUpdate()) { + log.warn("Wait for system update is disabled. Proceeding with startup."); + IS_UPDATED.getAndSet(true); + } int maxBackOffs = Integer.parseInt(_configurationProvider.getSystemUpdate().getMaxBackOffs()); long initialBackOffMs = Long.parseLong(_configurationProvider.getSystemUpdate().getInitialBackOffMs()); int backOffFactor = Integer.parseInt(_configurationProvider.getSystemUpdate().getBackOffFactor());