Skip to content

Commit

Permalink
feat(browsepathv2): Allow system-update to reprocess browse paths v2 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 8, 2023
1 parent 23c98ec commit 353584c
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
Expand All @@ -37,6 +39,8 @@
public class BackfillBrowsePathsV2Step implements UpgradeStep {

public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 = "REPROCESS_DEFAULT_BROWSE_PATHS_V2";
public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default";

private static final Set<String> ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of(
Constants.DATASET_ENTITY_NAME,
Expand Down Expand Up @@ -81,37 +85,24 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {

private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) {

// Condition: has `browsePaths` AND does NOT have `browsePathV2`
Criterion missingBrowsePathV2 = new Criterion();
missingBrowsePathV2.setCondition(Condition.IS_NULL);
missingBrowsePathV2.setField("browsePathV2");
// Excludes entities without browsePaths
Criterion hasBrowsePathV1 = new Criterion();
hasBrowsePathV1.setCondition(Condition.EXISTS);
hasBrowsePathV1.setField("browsePaths");

CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingBrowsePathV2);
criterionArray.add(hasBrowsePathV1);

ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
final Filter filter;

ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);

Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2)
&& Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) {
filter = backfillDefaultBrowsePathsV2Filter();
} else {
filter = backfillBrowsePathsV2Filter();
}

final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
"5m",
null,
BATCH_SIZE,
null
new SearchFlags().setFulltext(true).setSkipCache(true).setSkipHighlighting(true).setSkipAggregates(true)
);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
Expand All @@ -129,6 +120,55 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S
return scrollResult.getScrollId();
}

private Filter backfillBrowsePathsV2Filter() {
// Condition: has `browsePaths` AND does NOT have `browsePathV2`
Criterion missingBrowsePathV2 = new Criterion();
missingBrowsePathV2.setCondition(Condition.IS_NULL);
missingBrowsePathV2.setField("browsePathV2");
// Excludes entities without browsePaths
Criterion hasBrowsePathV1 = new Criterion();
hasBrowsePathV1.setCondition(Condition.EXISTS);
hasBrowsePathV1.setField("browsePaths");

CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingBrowsePathV2);
criterionArray.add(hasBrowsePathV1);

ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);

ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);

Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}

private Filter backfillDefaultBrowsePathsV2Filter() {
// Condition: has default `browsePathV2`
Criterion hasDefaultBrowsePathV2 = new Criterion();
hasDefaultBrowsePathV2.setCondition(Condition.EQUAL);
hasDefaultBrowsePathV2.setField("browsePathV2");
StringArray values = new StringArray();
values.add(DEFAULT_BROWSE_PATH_V2);
hasDefaultBrowsePathV2.setValues(values);
hasDefaultBrowsePathV2.setValue(DEFAULT_BROWSE_PATH_V2); // not used, but required field?

CriterionArray criterionArray = new CriterionArray();
criterionArray.add(hasDefaultBrowsePathV2);

ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);

ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);

Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}

private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
Expand All @@ -142,7 +182,7 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio
_entityService.ingestProposal(
proposal,
auditStamp,
false
true
);
}

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ DATAHUB_GMS_PORT=8080

ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}

# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ DATAHUB_GMS_PORT=8080

ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true
REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}

# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
- ${HOME}/.datahub/plugins:/etc/datahub/plugins
datahub-upgrade:
image: acryldata/datahub-upgrade:debug
ports:
- ${DATAHUB_MAPPED_UPGRADE_DEBUG_PORT:-5003}:5003
build:
context: datahub-upgrade
dockerfile: Dockerfile
Expand All @@ -63,6 +65,8 @@ services:
- SKIP_ELASTICSEARCH_CHECK=false
- DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-dev}
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true}
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=${REPROCESS_DEFAULT_BROWSE_PATHS_V2:-false}
- JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5003
volumes:
- ../datahub-upgrade/build/libs/:/datahub/datahub-upgrade/bin/
- ../metadata-models/src/main/resources/:/datahub/datahub-gms/resources
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ services:
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
- REPROCESS_DEFAULT_BROWSE_PATHS_V2=false
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import org.javatuples.Quintet;
import org.javatuples.Septet;
import org.javatuples.Sextet;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
Expand Down Expand Up @@ -154,8 +154,9 @@ public SearchResult getCachedSearchResults(
batchSize,
querySize -> getRawSearchResults(entityNames, query, filters, sortCriterion, querySize.getFrom(),
querySize.getSize(), flags, facets),
querySize -> Sextet.with(entityNames, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, facets, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Septet.with(entityNames, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, flags != null ? toJsonString(flags) : null,
facets, querySize), flags, enableCache).getSearchResults(from, size);
}


Expand All @@ -175,7 +176,8 @@ public AutoCompleteResult getCachedAutoCompleteResults(
if (enableCache(flags)) {
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
Object cacheKey = Sextet.with(entityName, input, field, filters != null ? toJsonString(filters) : null,
flags != null ? toJsonString(flags) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
Expand Down Expand Up @@ -210,7 +212,8 @@ public BrowseResult getCachedBrowseResults(
if (enableCache(flags)) {
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
Object cacheKey = Sextet.with(entityName, path, filters != null ? toJsonString(filters) : null,
flags != null ? toJsonString(flags) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
Expand Down Expand Up @@ -247,9 +250,10 @@ public ScrollResult getCachedScrollResults(
ScrollResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "scroll_cache_access").time();
Object cacheKey = Sextet.with(entities, query,
Object cacheKey = Septet.with(entities, query,
filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null,
flags != null ? toJsonString(flags) : null,
scrollId, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(ScrollResult.class, json) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private AggregationMetadataArray transformIndexIntoEntityName(AggregationMetadat
@Nonnull
@WithSpan
private ScrollResult executeAndExtract(@Nonnull List<EntitySpec> entitySpecs, @Nonnull SearchRequest searchRequest, @Nullable Filter filter,
@Nullable String scrollId, @Nonnull String keepAlive, int size) {
@Nullable String scrollId, @Nullable String keepAlive, int size) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "executeAndExtract_scroll").time()) {
final SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// extract results, validated against document model as well
Expand All @@ -166,7 +166,7 @@ private ScrollResult executeAndExtract(@Nonnull List<EntitySpec> entitySpecs, @N
.extractScrollResult(searchResponse,
filter, scrollId, keepAlive, size, supportsPointInTime()));
} catch (Exception e) {
log.error("Search query failed", e);
log.error("Search query failed: {}", searchRequest, e);
throw new ESQueryException("Search query failed:", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ public SearchRequest getSearchRequest(@Nonnull String input, @Nullable Filter fi

BoolQueryBuilder filterQuery = getFilterQuery(filter);
searchSourceBuilder.query(QueryBuilders.boolQuery().must(getQuery(input, finalSearchFlags.isFulltext())).filter(filterQuery));
_aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation);
if (!finalSearchFlags.isSkipAggregates()) {
_aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation);
}
if (!finalSearchFlags.isSkipHighlighting()) {
searchSourceBuilder.highlighter(_highlights);
}
Expand Down Expand Up @@ -366,7 +368,7 @@ public SearchResult extractResult(@Nonnull SearchResponse searchResponse, Filter

@WithSpan
public ScrollResult extractScrollResult(@Nonnull SearchResponse searchResponse, Filter filter, @Nullable String scrollId,
@Nonnull String keepAlive, int size, boolean supportsPointInTime) {
@Nullable String keepAlive, int size, boolean supportsPointInTime) {
int totalCount = (int) searchResponse.getHits().getTotalHits().value;
List<SearchEntity> resultList = getResults(searchResponse);
SearchResultMetadata searchResultMetadata = extractSearchResultMetadata(searchResponse, filter);
Expand All @@ -376,7 +378,7 @@ public ScrollResult extractScrollResult(@Nonnull SearchResponse searchResponse,
if (searchHits.length == size) {
Object[] sort = searchHits[searchHits.length - 1].getSortValues();
long expirationTimeMs = 0L;
if (supportsPointInTime) {
if (keepAlive != null && supportsPointInTime) {
expirationTimeMs = TimeValue.parseTimeValue(keepAlive, "expirationTime").getMillis() + System.currentTimeMillis();
}
nextScrollId = new SearchAfterWrapper(sort, searchResponse.pointInTimeId(), expirationTimeMs).toScrollId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ bootstrap:
enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones
backfillBrowsePathsV2:
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate
reprocessDefaultBrowsePathsV2:
enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} # reprocess V2 browse paths which were set to the default: {"path":[{"id":"Default"}]}
policies:
file: ${BOOTSTRAP_POLICIES_FILE:classpath:boot/policies.json}
# eg for local file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable
*/
@Nonnull
ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags);

/**
* Gets a list of documents that match given search request. The results are aggregated and filters are applied to the
Expand All @@ -210,7 +210,7 @@ ScrollResult fullTextScroll(@Nonnull List<String> entities, @Nonnull String inpu
*/
@Nonnull
ScrollResult structuredScroll(@Nonnull List<String> entities, @Nonnull String input, @Nullable Filter postFilters,
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags);
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags);

/**
* Max result size returned by the underlying search backend
Expand Down

0 comments on commit 353584c

Please sign in to comment.