Skip to content

Commit

Permalink
feat(entity-client): fix entity client cache and prevent circular dep
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Sep 26, 2023
1 parent 0a869dd commit 3fdce5d
Show file tree
Hide file tree
Showing 38 changed files with 408 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static List<ReindexConfig> getAllReindexConfigs(List<ElasticSearchIndexed
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
if (reindexConfigs.isEmpty()) {
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
reindexConfigs.addAll(elasticSearchIndexed.getReindexConfigs());
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs());
}
_reindexConfigs = new ArrayList<>(reindexConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand Down Expand Up @@ -35,4 +36,7 @@ public class UpgradeCliApplicationTestConfiguration {

@MockBean
ConfigEntityRegistry configEntityRegistry;

@MockBean
public EntityIndexBuilders entityIndexBuilders;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.ebean.PagedList;
import io.ebean.Transaction;

import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Timestamp;
Expand Down Expand Up @@ -103,6 +104,9 @@ Integer countAspect(
@Nonnull
PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args);

@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);

int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Timer;
import com.linkedin.data.template.GetMode;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import javax.persistence.EntityNotFoundException;

import io.ebean.Transaction;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.*;
Expand Down Expand Up @@ -144,11 +146,11 @@ public class EntityServiceImpl implements EntityService {
private final Map<String, Set<String>> _entityToValidAspects;
private RetentionService _retentionService;
private final Boolean _alwaysEmitChangeLog;
@Getter
private final UpdateIndicesService _updateIndicesService;
private final PreProcessHooks _preProcessHooks;
protected static final int MAX_KEYS_PER_QUERY = 500;


private final Integer ebeanMaxTransactionRetry;

public EntityServiceImpl(
Expand Down Expand Up @@ -180,6 +182,11 @@ public EntityServiceImpl(
ebeanMaxTransactionRetry = retry != null ? retry : DEFAULT_MAX_TRANSACTION_RETRY;
}

@Override
public void setSystemEntityClient(SystemEntityClient systemEntityClient) {
this._updateIndicesService.setSystemEntityClient(systemEntityClient);
}

/**
* Retrieves the latest aspects corresponding to a batch of {@link Urn}s based on a provided
* set of aspect names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -445,6 +446,12 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
return null;
}

@Nonnull
@Override
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
// Not implemented
return null;
}

@Override
@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -433,6 +434,18 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
.findPagedList();
}

@Override
@Nonnull
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.like(EbeanAspectV2.URN_COLUMN, "urn:li:" + entityName + ":%");
return exp.query().findStream().map(EbeanAspectV2::toEntityAspect);
}

@Override
@Nonnull
public Iterable<String> listAllUrns(int start, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void removeEdgesFromNode(
public void configure() {
log.info("Setting up elastic graph index");
try {
for (ReindexConfig config : getReindexConfigs()) {
for (ReindexConfig config : buildReindexConfigs()) {
_indexBuilder.buildIndex(config);
}
} catch (IOException e) {
Expand All @@ -327,7 +327,7 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() throws IOException {
public List<ReindexConfig> buildReindexConfigs() throws IOException {
return List.of(_indexBuilder.buildReindexState(_indexConvention.getIndexName(INDEX_NAME),
GraphRelationshipMappingsBuilder.getMappings(), Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public void configure() {
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return indexBuilders.getReindexConfigs();
public List<ReindexConfig> buildReindexConfigs() {
return indexBuilders.buildReindexConfigs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,7 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
// no need to reindex and only new mappings or dynamic settings

// Just update the additional mappings
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
}
applyMappings(indexState, true);

if (indexState.requiresApplySettings()) {
UpdateSettingsRequest request = new UpdateSettingsRequest(indexState.name());
Expand All @@ -234,6 +229,26 @@ public void buildIndex(ReindexConfig indexState) throws IOException {
}
}

/**
* Apply mappings changes if reindex is not required
* @param indexState the state of the current and target index settings/mappings
* @param suppressError during reindex logic this is not an error, for structured properties it is an error
* @throws IOException communication issues with ES
*/
public void applyMappings(ReindexConfig indexState, boolean suppressError) throws IOException {
if (indexState.isPureMappingsAddition()) {
log.info("Updating index {} mappings in place.", indexState.name());
PutMappingRequest request = new PutMappingRequest(indexState.name()).source(indexState.targetMappings());
_searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexState.name());
} else {
if (!suppressError) {
log.error("Attempted to apply invalid mappings. Current: {} Target: {}", indexState.currentMappings(),
indexState.targetMappings());
}
}
}

public String reindexInPlaceAsync(String indexAlias, @Nullable QueryBuilder filterQuery, BatchWriteOperationsOptions options, ReindexConfig config)
throws Exception {
GetAliasesResponse aliasesResponse = _searchClient.indices().getAlias(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.RequiredArgsConstructor;
Expand All @@ -14,32 +16,37 @@
@RequiredArgsConstructor
@Slf4j
public class EntityIndexBuilders implements ElasticSearchIndexed {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

@Override
public void reindexAll() {
for (ReindexConfig config : getReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> getReindexConfigs() {
return entityRegistry.getEntitySpecs().values().stream().flatMap(entitySpec -> {
try {
return new EntityIndexBuilder(indexBuilder, entitySpec, settingsBuilder, indexConvention.getIndexName(entitySpec))
.getReindexConfigs().stream();
} catch (IOException e) {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;

public ESIndexBuilder getIndexBuilder() {
return indexBuilder;
}

@Override
public void reindexAll() {
for (ReindexConfig config : buildReindexConfigs()) {
try {
indexBuilder.buildIndex(config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public List<ReindexConfig> buildReindexConfigs() {
Map<String, Object> settings = settingsBuilder.getSettings();
return entityRegistry.getEntitySpecs().values().stream().map(entitySpec -> {
try {
Map<String, Object> mappings = MappingsBuilder.getMappings(entitySpec);
return indexBuilder.buildReindexState(indexConvention.getIndexName(entitySpec), mappings, settings);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
).collect(Collectors.toList());
}
}
).collect(Collectors.toList());
}
}
Loading

0 comments on commit 3fdce5d

Please sign in to comment.