From 5f719b086d723730194f8907895913c4e98650cc Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Tue, 1 Aug 2023 12:48:02 -0500 Subject: [PATCH] feat(browseV2): add browseV2 logic to system update (#8506) Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com> --- .../config/BackfillBrowsePathsV2Config.java | 17 ++ .../upgrade/config/SystemUpdateConfig.java | 6 +- .../datahub/upgrade/system/SystemUpdate.java | 6 +- .../entity/steps/BackfillBrowsePathsV2.java | 28 +++ .../steps/BackfillBrowsePathsV2Step.java | 167 ++++++++++++++++++ .../env/docker-without-neo4j.env | 1 + docker/datahub-upgrade/env/docker.env | 1 + .../docker-compose-m1.quickstart.yml | 1 + ...er-compose-without-neo4j-m1.quickstart.yml | 1 + ...ocker-compose-without-neo4j.quickstart.yml | 1 + .../quickstart/docker-compose.quickstart.yml | 1 + .../src/main/resources/application.yml | 2 +- 12 files changed, 227 insertions(+), 5 deletions(-) create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java create mode 100644 datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java new file mode 100644 index 0000000000000..16e5e4247267f --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java @@ -0,0 +1,17 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.SearchService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class BackfillBrowsePathsV2Config { + + @Bean + public BackfillBrowsePathsV2 backfillBrowsePathsV2(EntityService entityService, SearchService searchService) { + return new BackfillBrowsePathsV2(entityService, searchService); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index 826bd0180d037..9848fc7a0008f 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -3,6 +3,7 @@ import com.linkedin.datahub.upgrade.system.SystemUpdate; import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory; @@ -30,10 +31,11 @@ public class SystemUpdateConfig { @Bean(name = "systemUpdate") public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices, @Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer, - final GitVersion gitVersion, @Qualifier("revision") String revision) { + final GitVersion gitVersion, @Qualifier("revision") String revision, + final BackfillBrowsePathsV2 backfillBrowsePathsV2) { String version = String.format("%s-%s", gitVersion.getVersion(), revision); - return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version); + return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version, backfillBrowsePathsV2); } @Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}") diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java index d7e490501a9fb..4a8211f2cd4ac 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java @@ -6,6 +6,7 @@ import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep; +import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2; import com.linkedin.metadata.dao.producer.KafkaEventProducer; import lombok.extern.slf4j.Slf4j; @@ -21,11 +22,12 @@ public class SystemUpdate implements Upgrade { private final List _steps; public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob, - final KafkaEventProducer kafkaEventProducer, final String version) { + final KafkaEventProducer kafkaEventProducer, final String version, + final BackfillBrowsePathsV2 backfillBrowsePathsV2) { _preStartupUpgrades = List.of(buildIndicesJob); _steps = List.of(new DataHubStartupStep(kafkaEventProducer, version)); - _postStartupUpgrades = List.of(cleanIndicesJob); + _postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2); } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java new file mode 100644 index 0000000000000..e213c0b2fd4de --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java @@ -0,0 +1,28 @@ +package com.linkedin.datahub.upgrade.system.entity.steps; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.Upgrade; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.SearchService; +import java.util.List; + + +public class BackfillBrowsePathsV2 implements Upgrade { + + private final List _steps; + + public BackfillBrowsePathsV2(EntityService entityService, SearchService searchService) { + _steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService)); + } + + @Override + public String id() { + return "BackfillBrowsePathsV2"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java new file mode 100644 index 0000000000000..7547186ccfb23 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java @@ -0,0 +1,167 @@ +package com.linkedin.datahub.upgrade.system.entity.steps; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.BrowsePathsV2; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; + +import java.util.Set; + +import static com.linkedin.metadata.Constants.*; + + +@Slf4j +public class BackfillBrowsePathsV2Step implements UpgradeStep { + + public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2"; + + private static final Set ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of( + Constants.DATASET_ENTITY_NAME, + Constants.DASHBOARD_ENTITY_NAME, + Constants.CHART_ENTITY_NAME, + Constants.DATA_JOB_ENTITY_NAME, + Constants.DATA_FLOW_ENTITY_NAME, + Constants.ML_MODEL_ENTITY_NAME, + Constants.ML_MODEL_GROUP_ENTITY_NAME, + Constants.ML_FEATURE_TABLE_ENTITY_NAME, + Constants.ML_FEATURE_ENTITY_NAME + ); + private static final Integer BATCH_SIZE = 5000; + + private final EntityService _entityService; + private final SearchService _searchService; + + public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) { + _searchService = searchService; + _entityService = entityService; + } + + @Override + public Function executable() { + return (context) -> { + final AuditStamp auditStamp = + new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + String scrollId = null; + for (String entityType : ENTITY_TYPES_TO_MIGRATE) { + int migratedCount = 0; + do { + log.info(String.format("Upgrading batch %s-%s of browse paths for entity type %s", migratedCount, + migratedCount + BATCH_SIZE, entityType)); + scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId); + migratedCount += BATCH_SIZE; + } while (scrollId != null); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } + + private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) { + + // Condition: has `browsePaths` AND does NOT have `browsePathV2` + Criterion missingBrowsePathV2 = new Criterion(); + missingBrowsePathV2.setCondition(Condition.IS_NULL); + missingBrowsePathV2.setField("browsePathV2"); + // Excludes entities without browsePaths + Criterion hasBrowsePathV1 = new Criterion(); + hasBrowsePathV1.setCondition(Condition.EXISTS); + hasBrowsePathV1.setField("browsePaths"); + + CriterionArray criterionArray = new CriterionArray(); + criterionArray.add(missingBrowsePathV2); + criterionArray.add(hasBrowsePathV1); + + ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion(); + conjunctiveCriterion.setAnd(criterionArray); + + ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray(); + conjunctiveCriterionArray.add(conjunctiveCriterion); + + Filter filter = new Filter(); + filter.setOr(conjunctiveCriterionArray); + + final ScrollResult scrollResult = _searchService.scrollAcrossEntities( + ImmutableList.of(entityType), + "*", + filter, + null, + scrollId, + "5m", + BATCH_SIZE, + null + ); + if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) { + return null; + } + + for (SearchEntity searchEntity : scrollResult.getEntities()) { + try { + ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp); + } catch (Exception e) { + // don't stop the whole step because of one bad urn or one bad ingestion + log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e); + } + } + + return scrollResult.getScrollId(); + } + + private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception { + BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true); + log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2)); + MetadataChangeProposal proposal = new MetadataChangeProposal(); + proposal.setEntityUrn(urn); + proposal.setEntityType(urn.getEntityType()); + proposal.setAspectName(Constants.BROWSE_PATHS_V2_ASPECT_NAME); + proposal.setChangeType(ChangeType.UPSERT); + proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis())); + proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2)); + _entityService.ingestProposal( + proposal, + auditStamp, + false + ); + } + + @Override + public String id() { + return "BackfillBrowsePathsV2Step"; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum retries. + */ + @Override + public boolean isOptional() { + return true; + } + + @Override + public boolean skip(UpgradeContext context) { + return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2)); + } +} + diff --git a/docker/datahub-upgrade/env/docker-without-neo4j.env b/docker/datahub-upgrade/env/docker-without-neo4j.env index c253425897006..c399f71b7b15c 100644 --- a/docker/datahub-upgrade/env/docker-without-neo4j.env +++ b/docker/datahub-upgrade/env/docker-without-neo4j.env @@ -20,6 +20,7 @@ DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml +BACKFILL_BROWSE_PATHS_V2=true # Uncomment and set these to support SSL connection to Elasticsearch # ELASTICSEARCH_USE_SSL= diff --git a/docker/datahub-upgrade/env/docker.env b/docker/datahub-upgrade/env/docker.env index f8cf050e06154..491470406153b 100644 --- a/docker/datahub-upgrade/env/docker.env +++ b/docker/datahub-upgrade/env/docker.env @@ -24,6 +24,7 @@ DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml +BACKFILL_BROWSE_PATHS_V2=true # Uncomment and set these to support SSL connection to Elasticsearch # ELASTICSEARCH_USE_SSL= diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index 89569510bb001..0e6b06ec5de0b 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -146,6 +146,7 @@ services: - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml + - BACKFILL_BROWSE_PATHS_V2=true hostname: datahub-upgrade image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head} labels: diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index 102500c8f1878..ba2df82ed9030 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -139,6 +139,7 @@ services: - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml + - BACKFILL_BROWSE_PATHS_V2=true hostname: datahub-upgrade image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head} labels: diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index 5fa2838255d9a..426e36d80f48c 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -139,6 +139,7 @@ services: - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml + - BACKFILL_BROWSE_PATHS_V2=true hostname: datahub-upgrade image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head} labels: diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index adee2da9276aa..837eaeb15007d 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -146,6 +146,7 @@ services: - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml + - BACKFILL_BROWSE_PATHS_V2=true hostname: datahub-upgrade image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head} labels: diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 5290dd4adceac..dfc7218eb772d 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -270,7 +270,7 @@ bootstrap: upgradeDefaultBrowsePaths: enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones backfillBrowsePathsV2: - enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. + enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate systemUpdate: initialBackOffMs: ${BOOTSTRAP_SYSTEM_UPDATE_INITIAL_BACK_OFF_MILLIS:5000}