Skip to content

Commit

Permalink
Merge branch 'master' into auto_lowercase_dataset_urns
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Oct 12, 2023
2 parents 5c3c7f4 + 84bba4d commit 9052d94
Show file tree
Hide file tree
Showing 116 changed files with 2,254 additions and 2,007 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
.dataFetcher("dataPlatform", getResolver(dataPlatformType))
.dataFetcher("dataPlatformInstance", getResolver(dataPlatformInstanceType))
.dataFetcher("mlFeatureTable", getResolver(mlFeatureTableType))
.dataFetcher("mlFeature", getResolver(mlFeatureType))
.dataFetcher("mlPrimaryKey", getResolver(mlPrimaryKeyType))
Expand Down Expand Up @@ -1291,7 +1292,8 @@ private void configureCorpUserResolvers(final RuntimeWiring.Builder builder) {
*/
private void configureCorpGroupResolvers(final RuntimeWiring.Builder builder) {
builder.type("CorpGroup", typeWiring -> typeWiring
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient)));
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher("exists", new EntityExistsResolver(entityService)));
builder.type("CorpGroupInfo", typeWiring -> typeWiring
.dataFetcher("admins",
new LoadableTypeBatchResolver<>(corpUserType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.types.dataplatforminstance.mappers.DataPlatformInstanceMapper;
import com.linkedin.datahub.graphql.types.mappers.AutoCompleteResultsMapper;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.filter.Filter;
import graphql.execution.DataFetcherResult;
import org.apache.commons.lang3.NotImplementedException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -22,7 +31,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public class DataPlatformInstanceType implements com.linkedin.datahub.graphql.types.EntityType<DataPlatformInstance, String> {
import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME;

public class DataPlatformInstanceType implements SearchableEntityType<DataPlatformInstance, String>,
com.linkedin.datahub.graphql.types.EntityType<DataPlatformInstance, String> {

static final Set<String> ASPECTS_TO_FETCH = ImmutableSet.of(
Constants.DATA_PLATFORM_INSTANCE_KEY_ASPECT_NAME,
Expand Down Expand Up @@ -84,4 +96,24 @@ public List<DataFetcherResult<DataPlatformInstance>> batchLoad(@Nonnull List<Str
}
}

@Override
public SearchResults search(@Nonnull String query,
@Nullable List<FacetFilterInput> filters,
int start,
int count,
@Nonnull final QueryContext context) throws Exception {
throw new NotImplementedException("Searchable type (deprecated) not implemented on DataPlatformInstance entity type");
}

@Override
public AutoCompleteResults autoComplete(@Nonnull String query,
@Nullable String field,
@Nullable Filter filters,
int limit,
@Nonnull final QueryContext context) throws Exception {
final AutoCompleteResult result = _entityClient.autoComplete(DATA_PLATFORM_INSTANCE_ENTITY_NAME, query,
filters, limit, context.getAuthentication());
return AutoCompleteResultsMapper.map(result);
}

}
10 changes: 10 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ type Query {
listOwnershipTypes(
"Input required for listing custom ownership types"
input: ListOwnershipTypesInput!): ListOwnershipTypesResult!

"""
Fetch a Data Platform Instance by primary key (urn)
"""
dataPlatformInstance(urn: String!): DataPlatformInstance
}

"""
Expand Down Expand Up @@ -3783,6 +3788,11 @@ type CorpGroup implements Entity {
Additional read only info about the group
"""
info: CorpGroupInfo @deprecated

"""
Whether or not this entity exists on DataHub
"""
exists: Boolean
}

"""
Expand Down
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/entity/group/GroupProfile.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { RoutedTabs } from '../../shared/RoutedTabs';
import GroupInfoSidebar from './GroupInfoSideBar';
import { GroupAssets } from './GroupAssets';
import { ErrorSection } from '../../shared/error/ErrorSection';
import NonExistentEntityPage from '../shared/entity/NonExistentEntityPage';

const messageStyle = { marginTop: '10%' };

Expand Down Expand Up @@ -110,6 +111,9 @@ export default function GroupProfile() {
urn,
};

if (data?.corpGroup?.exists === false) {
return <NonExistentEntityPage />;
}
return (
<>
{error && <ErrorSection />}
Expand Down
1 change: 1 addition & 0 deletions datahub-web-react/src/graphql/group.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ query getGroup($urn: String!, $membersCount: Int!) {
urn
type
name
exists
origin {
type
externalType
Expand Down
2 changes: 2 additions & 0 deletions docker/mariadb/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, creat
now(),
'urn:li:corpuser:__datahub_system'
);

DROP TABLE IF EXISTS metadata_index;
2 changes: 2 additions & 0 deletions docker/mysql-setup/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ INSERT INTO metadata_aspect_v2
SELECT * FROM temp_metadata_aspect_v2
WHERE NOT EXISTS (SELECT * from metadata_aspect_v2);
DROP TABLE temp_metadata_aspect_v2;

DROP TABLE IF EXISTS metadata_index;
2 changes: 2 additions & 0 deletions docker/mysql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ INSERT INTO metadata_aspect_v2 (urn, aspect, version, metadata, createdon, creat
now(),
'urn:li:corpuser:__datahub_system'
);

DROP TABLE IF EXISTS metadata_index;
2 changes: 2 additions & 0 deletions docker/postgres-setup/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ INSERT INTO metadata_aspect_v2
SELECT * FROM temp_metadata_aspect_v2
WHERE NOT EXISTS (SELECT * from metadata_aspect_v2);
DROP TABLE temp_metadata_aspect_v2;

DROP TABLE IF EXISTS metadata_index;
2 changes: 2 additions & 0 deletions docker/postgres/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, creat
now(),
'urn:li:corpuser:__datahub_system'
);

DROP TABLE IF EXISTS metadata_index;
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes

- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted
by Looker and LookML source connectors.
- #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details.
- #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`.
- #8943 The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
Expand Down Expand Up @@ -35,4 +36,20 @@ public Set<String> getOwners() {
}
return fieldResolvers.get(ResourceFieldType.OWNER).getFieldValuesFuture().join().getValues();
}

/**
* Fetch the platform instance for a Resolved Resource Spec
* @return a Platform Instance or null if one does not exist.
*/
@Nullable
public String getDataPlatformInstance() {
if (!fieldResolvers.containsKey(ResourceFieldType.DATA_PLATFORM_INSTANCE)) {
return null;
}
Set<String> dataPlatformInstance = fieldResolvers.get(ResourceFieldType.DATA_PLATFORM_INSTANCE).getFieldValuesFuture().join().getValues();
if (dataPlatformInstance.size() > 0) {
return dataPlatformInstance.stream().findFirst().get();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ public enum ResourceFieldType {
/**
* Domains of resource
*/
DOMAIN
DOMAIN,
/**
* Data platform instance of resource
*/
DATA_PLATFORM_INSTANCE
}
9 changes: 9 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Generic, Iterable, Optional, Tuple, TypeVar

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mce_builder import set_dataset_urn_to_lower
from datahub.ingestion.api.committable import Committable
from datahub.ingestion.graph.client import DataHubGraph
Expand Down Expand Up @@ -75,3 +76,11 @@ def register_checkpointer(self, committable: Committable) -> None:

def get_committables(self) -> Iterable[Tuple[str, Committable]]:
yield from self.checkpointers.items()

def require_graph(self, operation: Optional[str] = None) -> DataHubGraph:
if not self.graph:
raise ConfigurationError(
f"{operation or 'This operation'} requires a graph, but none was provided. "
"To provide one, either use the datahub-rest sink or set the top-level datahub_api config in the recipe."
)
return self.graph
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import copy
from typing import Dict, Iterable, Optional

from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
FineGrainedLineageClass,
MetadataChangeEventClass,
SystemMetadataClass,
UpstreamClass,
UpstreamLineageClass,
)
from datahub.specific.dataset import DatasetPatchBuilder


def _convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
patch_builder = DatasetPatchBuilder(urn, system_metadata)
for upstream in aspect.upstreams:
patch_builder.add_upstream_lineage(upstream)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp)


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
return datahub_guid(
{
"upstreams": sorted(fine_upstream.upstreams or []),
"downstreams": sorted(fine_upstream.downstreams or []),
"transformOperation": fine_upstream.transformOperation,
}
)


def _merge_upstream_lineage(
new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass
) -> UpstreamLineageClass:
merged_aspect = copy.deepcopy(gms_aspect)

upstreams_map: Dict[str, UpstreamClass] = {
upstream.dataset: upstream for upstream in merged_aspect.upstreams
}

upstreams_updated = False
fine_upstreams_updated = False

for table_upstream in new_aspect.upstreams:
if table_upstream.dataset not in upstreams_map or (
table_upstream.auditStamp.time
> upstreams_map[table_upstream.dataset].auditStamp.time
):
upstreams_map[table_upstream.dataset] = table_upstream
upstreams_updated = True

if upstreams_updated:
merged_aspect.upstreams = list(upstreams_map.values())

if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages:
fine_upstreams_map: Dict[str, FineGrainedLineageClass] = {
get_fine_grained_lineage_key(fine_upstream): fine_upstream
for fine_upstream in merged_aspect.fineGrainedLineages
}
for column_upstream in new_aspect.fineGrainedLineages:
column_upstream_key = get_fine_grained_lineage_key(column_upstream)

if column_upstream_key not in fine_upstreams_map or (
column_upstream.confidenceScore
> fine_upstreams_map[column_upstream_key].confidenceScore
):
fine_upstreams_map[column_upstream_key] = column_upstream
fine_upstreams_updated = True

if fine_upstreams_updated:
merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values())
else:
merged_aspect.fineGrainedLineages = (
new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages
)

return merged_aspect


def _lineage_wu_via_read_modify_write(
graph: Optional[DataHubGraph],
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
else:
new_aspect = aspect

return MetadataChangeProposalWrapper(
entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata
).as_workunit()


def auto_incremental_lineage(
graph: Optional[DataHubGraph],
incremental_lineage: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not incremental_lineage:
yield from stream
return # early exit

for wu in stream:
lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type(
UpstreamLineageClass
)
urn = wu.get_urn()

if lineage_aspect:
if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll emit upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
Loading

0 comments on commit 9052d94

Please sign in to comment.