Skip to content

Commit

Permalink
feat(browseV2): add browseV2 logic to system update (#8506)
Browse files Browse the repository at this point in the history
Co-authored-by: david-leifker <[email protected]>
  • Loading branch information
2 people authored and yoonhyejin committed Aug 24, 2023
1 parent 12c472d commit 5f719b0
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class BackfillBrowsePathsV2Config {

@Bean
public BackfillBrowsePathsV2 backfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
return new BackfillBrowsePathsV2(entityService, searchService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
Expand Down Expand Up @@ -30,10 +31,11 @@ public class SystemUpdateConfig {
@Bean(name = "systemUpdate")
public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices,
@Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer,
final GitVersion gitVersion, @Qualifier("revision") String revision) {
final GitVersion gitVersion, @Qualifier("revision") String revision,
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {

String version = String.format("%s-%s", gitVersion.getVersion(), revision);
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version);
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version, backfillBrowsePathsV2);
}

@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -21,11 +22,12 @@ public class SystemUpdate implements Upgrade {
private final List<UpgradeStep> _steps;

public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob,
final KafkaEventProducer kafkaEventProducer, final String version) {
final KafkaEventProducer kafkaEventProducer, final String version,
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {

_preStartupUpgrades = List.of(buildIndicesJob);
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
_postStartupUpgrades = List.of(cleanIndicesJob);
_postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.datahub.upgrade.system.entity.steps;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import java.util.List;


public class BackfillBrowsePathsV2 implements Upgrade {

private final List<UpgradeStep> _steps;

public BackfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
_steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
}

@Override
public String id() {
return "BackfillBrowsePathsV2";
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package com.linkedin.datahub.upgrade.system.entity.steps;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

import java.util.Set;

import static com.linkedin.metadata.Constants.*;


@Slf4j
public class BackfillBrowsePathsV2Step implements UpgradeStep {

public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";

private static final Set<String> ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of(
Constants.DATASET_ENTITY_NAME,
Constants.DASHBOARD_ENTITY_NAME,
Constants.CHART_ENTITY_NAME,
Constants.DATA_JOB_ENTITY_NAME,
Constants.DATA_FLOW_ENTITY_NAME,
Constants.ML_MODEL_ENTITY_NAME,
Constants.ML_MODEL_GROUP_ENTITY_NAME,
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
Constants.ML_FEATURE_ENTITY_NAME
);
private static final Integer BATCH_SIZE = 5000;

private final EntityService _entityService;
private final SearchService _searchService;

public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) {
_searchService = searchService;
_entityService = entityService;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
final AuditStamp auditStamp =
new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());

String scrollId = null;
for (String entityType : ENTITY_TYPES_TO_MIGRATE) {
int migratedCount = 0;
do {
log.info(String.format("Upgrading batch %s-%s of browse paths for entity type %s", migratedCount,
migratedCount + BATCH_SIZE, entityType));
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
migratedCount += BATCH_SIZE;
} while (scrollId != null);
}
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) {

// Condition: has `browsePaths` AND does NOT have `browsePathV2`
Criterion missingBrowsePathV2 = new Criterion();
missingBrowsePathV2.setCondition(Condition.IS_NULL);
missingBrowsePathV2.setField("browsePathV2");
// Excludes entities without browsePaths
Criterion hasBrowsePathV1 = new Criterion();
hasBrowsePathV1.setCondition(Condition.EXISTS);
hasBrowsePathV1.setField("browsePaths");

CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingBrowsePathV2);
criterionArray.add(hasBrowsePathV1);

ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);

ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(conjunctiveCriterion);

Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);

final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
"5m",
BATCH_SIZE,
null
);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}

for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e);
}
}

return scrollResult.getScrollId();
}

private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(Constants.BROWSE_PATHS_V2_ASPECT_NAME);
proposal.setChangeType(ChangeType.UPSERT);
proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
_entityService.ingestProposal(
proposal,
auditStamp,
false
);
}

@Override
public String id() {
return "BackfillBrowsePathsV2Step";
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum retries.
*/
@Override
public boolean isOptional() {
return true;
}

@Override
public boolean skip(UpgradeContext context) {
return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
}
}

1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true

# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
BACKFILL_BROWSE_PATHS_V2=true

# Uncomment and set these to support SSL connection to Elasticsearch
# ELASTICSEARCH_USE_SSL=
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- BACKFILL_BROWSE_PATHS_V2=true
hostname: datahub-upgrade
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ bootstrap:
upgradeDefaultBrowsePaths:
enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones
backfillBrowsePathsV2:
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag.
enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate

systemUpdate:
initialBackOffMs: ${BOOTSTRAP_SYSTEM_UPDATE_INITIAL_BACK_OFF_MILLIS:5000}
Expand Down

0 comments on commit 5f719b0

Please sign in to comment.