Skip to content

Commit

Permalink
misc: datahub-upgrade improvements, aspect key & default aspects fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Feb 7, 2024
1 parent 20b9050 commit 54841e9
Show file tree
Hide file tree
Showing 25 changed files with 451 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.linkedin.gms.factory.auth.AuthorizerChainFactory;
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -24,7 +28,11 @@
classes = {
ScheduledAnalyticsFactory.class,
AuthorizerChainFactory.class,
DataHubAuthorizerFactory.class
DataHubAuthorizerFactory.class,
SimpleKafkaConsumerFactory.class,
KafkaEventConsumerFactory.class,
InternalSchemaRegistryFactory.class,
GraphQLEngineFactory.class
})
})
public class UpgradeCliApplication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -11,7 +12,12 @@ public class BackfillBrowsePathsV2Config {

@Bean
public BackfillBrowsePathsV2 backfillBrowsePathsV2(
EntityService<?> entityService, SearchService searchService) {
return new BackfillBrowsePathsV2(entityService, searchService);
EntityService<?> entityService,
SearchService searchService,
@Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled,
@Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled,
@Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) {
return new BackfillBrowsePathsV2(
entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ReindexDataJobViaNodesCLLConfig {

@Bean
public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService<?> entityService) {
return new ReindexDataJobViaNodesCLL(entityService);
public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(
EntityService<?> entityService,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) {
return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.linkedin.datahub.upgrade.config;

import org.springframework.boot.ApplicationArguments;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class SystemUpdateCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
return context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream()
.anyMatch("SystemUpdate"::equals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
Expand All @@ -21,9 +22,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Slf4j
@Configuration
Expand Down Expand Up @@ -74,4 +78,23 @@ protected KafkaEventProducer duheKafkaEventProducer(
duheSchemaRegistryConfig, kafkaConfiguration, properties));
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker);
}

/**
* The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default
* producer with this special producer which doesn't require an active registry.
*
* <p>Use when INTERNAL registry and is SYSTEM_UPDATE
*
* <p>This forces this producer into the EntityService
*/
@Primary
@Bean(name = "kafkaEventProducer")
@Conditional(SystemUpdateCondition.class)
@ConditionalOnProperty(
name = "kafka.schemaRegistry.type",
havingValue = InternalSchemaRegistryFactory.TYPE)
protected KafkaEventProducer kafkaEventProducer(
@Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) {
return kafkaEventProducer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,20 @@ public class BackfillBrowsePathsV2 implements Upgrade {

private final List<UpgradeStep> _steps;

public BackfillBrowsePathsV2(EntityService<?> entityService, SearchService searchService) {
_steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
public BackfillBrowsePathsV2(
EntityService<?> entityService,
SearchService searchService,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillBrowsePathsV2Step(
entityService, searchService, reprocessEnabled, batchSize));
} else {
_steps = ImmutableList.of();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
Expand All @@ -37,9 +38,8 @@
@Slf4j
public class BackfillBrowsePathsV2Step implements UpgradeStep {

public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 =
"REPROCESS_DEFAULT_BROWSE_PATHS_V2";
private static final String UPGRADE_ID = "BackfillBrowsePathsV2Step";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default";

private static final Set<String> ENTITY_TYPES_TO_MIGRATE =
Expand All @@ -53,14 +53,22 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
Constants.ML_MODEL_GROUP_ENTITY_NAME,
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
Constants.ML_FEATURE_ENTITY_NAME);
private static final Integer BATCH_SIZE = 5000;

private final EntityService<?> _entityService;
private final SearchService _searchService;

public BackfillBrowsePathsV2Step(EntityService<?> entityService, SearchService searchService) {
_searchService = searchService;
_entityService = entityService;
private final EntityService<?> entityService;
private final SearchService searchService;

private final boolean reprocessEnabled;
private final Integer batchSize;

public BackfillBrowsePathsV2Step(
EntityService<?> entityService,
SearchService searchService,
boolean reprocessEnabled,
Integer batchSize) {
this.searchService = searchService;
this.entityService = entityService;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
}

@Override
Expand All @@ -78,11 +86,14 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
log.info(
String.format(
"Upgrading batch %s-%s of browse paths for entity type %s",
migratedCount, migratedCount + BATCH_SIZE, entityType));
migratedCount, migratedCount + batchSize, entityType));
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
migratedCount += BATCH_SIZE;
migratedCount += batchSize;
} while (scrollId != null);
}

BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
Expand All @@ -91,27 +102,27 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S

final Filter filter;

if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2)
&& Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) {
if (reprocessEnabled) {
filter = backfillDefaultBrowsePathsV2Filter();
} else {
filter = backfillBrowsePathsV2Filter();
}

final ScrollResult scrollResult =
_searchService.scrollAcrossEntities(
searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
null,
BATCH_SIZE,
batchSize,
new SearchFlags()
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true));

if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
Expand Down Expand Up @@ -183,7 +194,7 @@ private Filter backfillDefaultBrowsePathsV2Filter() {

private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 =
DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService);
DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, entityService);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
Expand All @@ -193,12 +204,12 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio
proposal.setSystemMetadata(
new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
_entityService.ingestProposal(proposal, auditStamp, true);
entityService.ingestProposal(proposal, auditStamp, true);
}

@Override
public String id() {
return "BackfillBrowsePathsV2Step";
return UPGRADE_ID;
}

/**
Expand All @@ -211,7 +222,22 @@ public boolean isOptional() {
}

@Override
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variables REPROCESS_DEFAULT_BROWSE_PATHS_V2 & BACKFILL_BROWSE_PATHS_V2 to determine whether to
* skip.
*/
public boolean skip(UpgradeContext context) {
return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
boolean envEnabled = Boolean.parseBoolean(System.getenv("BACKFILL_BROWSE_PATHS_V2"));

if (reprocessEnabled && envEnabled) {
return false;
}

boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return (previouslyRun || !envEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ public class ReindexDataJobViaNodesCLL implements Upgrade {

private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(EntityService<?> entityService) {
_steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService));
public ReindexDataJobViaNodesCLL(
EntityService<?> entityService, boolean enabled, Integer batchSize) {
if (enabled) {
_steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize));
} else {
_steps = ImmutableList.of();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import java.net.URISyntaxException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -21,12 +20,12 @@ public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {
private static final String UPGRADE_ID = "via-node-cll-reindex-datajob";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private static final Integer BATCH_SIZE = 5000;
private final EntityService<?> entityService;
private final Integer batchSize;

private final EntityService _entityService;

public ReindexDataJobViaNodesCLLStep(EntityService entityService) {
_entityService = entityService;
public ReindexDataJobViaNodesCLLStep(EntityService<?> entityService, Integer batchSize) {
this.entityService = entityService;
this.batchSize = batchSize;
}

@Override
Expand All @@ -35,17 +34,16 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%");
.setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.setBatchSize(batchSize);
RestoreIndicesResult result =
_entityService.restoreIndices(args, x -> context.report().addLine((String) x));
entityService.restoreIndices(args, x -> context.report().addLine((String) x));
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
try {
BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, _entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
Expand All @@ -70,7 +68,7 @@ public boolean isOptional() {
* variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
boolean previouslyRun = _entityService.exists(UPGRADE_ID_URN, true);
boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
boolean envFlagRecommendsSkip =
Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT"));
if (previouslyRun) {
Expand Down
Loading

0 comments on commit 54841e9

Please sign in to comment.