diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 8b625b3ae2289..fa9109689caad 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -769,8 +769,11 @@ public List batchIngestProposals( opContext.getValidationContext().isAlternateValidation()) .build(); + List results = entityService.ingestProposal(opContext, batch, async); + entitySearchService.appendRunId(opContext, results); + Map, List> resultMap = - entityService.ingestProposal(opContext, batch, async).stream() + results.stream() .collect( Collectors.groupingBy( result -> @@ -864,8 +867,7 @@ public void rollbackIngestion( private void tryIndexRunId( @Nonnull OperationContext opContext, Urn entityUrn, @Nullable SystemMetadata systemMetadata) { if (systemMetadata != null && systemMetadata.hasRunId()) { - entitySearchService.appendRunId( - opContext, entityUrn.getEntityType(), entityUrn, systemMetadata.getRunId()); + entitySearchService.appendRunId(opContext, entityUrn, systemMetadata.getRunId()); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 261ec127d5497..7f1d467dbd491 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -106,17 +106,17 @@ public void deleteDocument( @Override public void appendRunId( - @Nonnull OperationContext opContext, - @Nonnull String entityName, - @Nonnull Urn urn, - @Nullable String runId) { + @Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId) { final String docId = indexBuilders.getIndexConvention().getEntityDocumentId(urn); log.debug( - "Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId); + "Appending run id for entity name: {}, doc id: {}, run id: {}", + urn.getEntityType(), + docId, + runId); esWriteDAO.applyScriptUpdate( opContext, - entityName, + urn.getEntityType(), docId, /* Script used to apply updates to the runId field of the index. diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 676232d5b5c04..77a11757a7a8a 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -22,12 +22,14 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.aspect.EnvelopedAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.validation.ValidationException; +import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.resources.operations.Utils; @@ -57,6 +59,8 @@ import java.time.Clock; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -320,15 +324,7 @@ private Task ingestProposals( List results = _entityService.ingestProposal(opContext, batch, asyncBool); - - for (IngestResult result : results) { - // Update runIds, only works for existing documents, so ES document must exist - Urn resultUrn = result.getUrn(); - - if (resultUrn != null && (result.isProcessedMCL() || result.isUpdate())) { - tryIndexRunId(opContext, resultUrn, result.getRequest().getSystemMetadata(), entitySearchService); - } - } + entitySearchService.appendRunId(opContext, results); // TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether? return RESTLI_SUCCESS; @@ -397,14 +393,4 @@ public Task restoreIndices( }, MetricRegistry.name(this.getClass(), "restoreIndices")); } - - private static void tryIndexRunId( - @Nonnull final OperationContext opContext, - final Urn urn, - final @Nullable SystemMetadata systemMetadata, - final EntitySearchService entitySearchService) { - if (systemMetadata != null && systemMetadata.hasRunId()) { - entitySearchService.appendRunId(opContext, urn.getEntityType(), urn, systemMetadata.getRunId()); - } - } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 26f2335d0c59f..92df9e6f427fa 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -1,15 +1,21 @@ package com.linkedin.metadata.search; import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.browse.BrowseResult; import com.linkedin.metadata.browse.BrowseResultV2; +import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.query.AutoCompleteResult; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opensearch.action.explain.ExplainResponse; @@ -59,15 +65,10 @@ void deleteDocument( /** * Appends a run id to the list for a certain document * - * @param entityName name of the entity * @param urn the urn of the user * @param runId the ID of the run */ - void appendRunId( - @Nonnull OperationContext opContext, - @Nonnull String entityName, - @Nonnull Urn urn, - @Nullable String runId); + void appendRunId(@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nullable String runId); /** * Gets a list of documents that match given search request. The results are aggregated and @@ -329,4 +330,41 @@ ExplainResponse explain( * @return convent */ IndexConvention getIndexConvention(); + + default void appendRunId( + @Nonnull final OperationContext opContext, @Nonnull List results) { + + // Only updates with runId + Map, Set> urnRunIdToBatchItem = + results.stream() + .filter(Objects::nonNull) + .filter( + result -> result.getUrn() != null && (result.isProcessedMCL() || result.isUpdate())) + .filter( + result -> + result.getResult() != null + && result.getRequest().getSystemMetadata() != null + && result.getRequest().getSystemMetadata().hasRunId()) + .map( + result -> + Map.entry( + Pair.of( + result.getUrn(), result.getRequest().getSystemMetadata().getRunId()), + result)) + .collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(e -> e.getValue().getRequest(), Collectors.toSet()))); + + // Only update if not key aspect (document doesn't exist) + urnRunIdToBatchItem.entrySet().stream() + .filter( + entry -> + entry.getValue().stream() + .noneMatch( + item -> + item.getEntitySpec().getKeyAspectName().equals(item.getAspectName()))) + .forEach( + entry -> appendRunId(opContext, entry.getKey().getKey(), entry.getKey().getValue())); + } } diff --git a/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java b/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java new file mode 100644 index 0000000000000..41e2c2f006e94 --- /dev/null +++ b/metadata-service/services/src/test/java/com/linkedin/metadata/service/search/EntitySearchServiceTest.java @@ -0,0 +1,345 @@ +package com.linkedin.metadata.service.search; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.browse.BrowseResult; +import com.linkedin.metadata.browse.BrowseResultV2; +import com.linkedin.metadata.entity.IngestResult; +import com.linkedin.metadata.entity.UpdateAspectResult; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchResult; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.mxe.SystemMetadata; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opensearch.action.explain.ExplainResponse; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class EntitySearchServiceTest { + private static final Urn TEST_URN = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.human_profiles,PROD)"); + + private OperationContext opContext = TestOperationContexts.systemContextNoValidate(); + + private EntitySearchService testInstance; + + @BeforeClass + public void setup() { + testInstance = spy(new TestEntitySearchService()); + } + + @Test + public void testAppendRunId_EmptyList() { + List results = new ArrayList<>(); + testInstance.appendRunId(opContext, results); + // Verify no interactions since list is empty + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NullResults() { + List results = new ArrayList<>(); + results.add(null); + testInstance.appendRunId(opContext, results); + // Verify no interactions since all results are null + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_ValidResult() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(true); + when(mockSystemMetadata.getRunId()).thenReturn("test-run-id"); + when(mockRequest.getEntitySpec()) + .thenReturn(opContext.getEntityRegistry().getEntitySpec(TEST_URN.getEntityType())); + when(mockRequest.getAspectName()).thenReturn("status"); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was called with correct parameters + verify(testInstance).appendRunId(eq(opContext), eq(TEST_URN), eq("test-run-id")); + } + + @Test + public void testAppendRunId_KeyAspectMatch() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(true); + when(mockSystemMetadata.getRunId()).thenReturn("test-run-id"); + when(mockRequest.getEntitySpec()) + .thenReturn(opContext.getEntityRegistry().getEntitySpec(TEST_URN.getEntityType())); + when(mockRequest.getAspectName()) + .thenReturn( + opContext + .getEntityRegistry() + .getEntitySpec(TEST_URN.getEntityType()) + .getKeyAspectName()); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because aspect names match + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NoRunId() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + BatchItem mockRequest = mock(BatchItem.class); + SystemMetadata mockSystemMetadata = mock(SystemMetadata.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(true); + when(result.getResult()).thenReturn(mock(UpdateAspectResult.class)); + when(result.getRequest()).thenReturn(mockRequest); + when(mockRequest.getSystemMetadata()).thenReturn(mockSystemMetadata); + when(mockSystemMetadata.hasRunId()).thenReturn(false); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because there's no run ID + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + @Test + public void testAppendRunId_NotProcessedOrUpdated() { + // Create test data + List results = new ArrayList<>(); + IngestResult result = mock(IngestResult.class); + + // Setup mock behaviors + when(result.getUrn()).thenReturn(TEST_URN); + when(result.isProcessedMCL()).thenReturn(false); + when(result.isUpdate()).thenReturn(false); + + results.add(result); + + // Execute + testInstance.appendRunId(opContext, results); + + // Verify appendRunId was not called because result is neither processed nor updated + verify(testInstance, never()).appendRunId(any(), any(Urn.class), anyString()); + } + + private static class TestEntitySearchService implements EntitySearchService { + @Override + public void clear(OperationContext opContext) {} + + @Override + public long docCount(OperationContext opContext, String entityName, Filter filter) { + return 0; + } + + @Override + public void upsertDocument( + OperationContext opContext, String entityName, String document, String docId) {} + + @Override + public void deleteDocument(OperationContext opContext, String entityName, String docId) {} + + @Override + public void appendRunId(OperationContext opContext, Urn urn, String runId) {} + + @Override + public SearchResult search( + OperationContext opContext, + List entityNames, + String input, + Filter postFilters, + List sortCriteria, + int from, + int size) { + return null; + } + + @Override + public SearchResult search( + OperationContext opContext, + List entityNames, + String input, + Filter postFilters, + List sortCriteria, + int from, + int size, + List facets) { + return null; + } + + @Override + public SearchResult filter( + OperationContext opContext, + String entityName, + Filter filters, + List sortCriteria, + int from, + int size) { + return null; + } + + @Override + public AutoCompleteResult autoComplete( + OperationContext opContext, + String entityName, + String query, + String field, + Filter requestParams, + int limit) { + return null; + } + + @Override + public Map aggregateByValue( + OperationContext opContext, + List entityNames, + String field, + Filter requestParams, + int limit) { + return null; + } + + @Override + public BrowseResult browse( + OperationContext opContext, + String entityName, + String path, + Filter requestParams, + int from, + int size) { + return null; + } + + @Override + public BrowseResultV2 browseV2( + OperationContext opContext, + String entityName, + String path, + Filter filter, + String input, + int start, + int count) { + return null; + } + + @Nonnull + @Override + public BrowseResultV2 browseV2( + @Nonnull OperationContext opContext, + @Nonnull List entityNames, + @Nonnull String path, + @Nullable Filter filter, + @Nonnull String input, + int start, + int count) { + return null; + } + + @Override + public List getBrowsePaths(OperationContext opContext, String entityName, Urn urn) { + return null; + } + + @Override + public ScrollResult fullTextScroll( + OperationContext opContext, + List entities, + String input, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size) { + return null; + } + + @Override + public ScrollResult structuredScroll( + OperationContext opContext, + List entities, + String input, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size) { + return null; + } + + @Override + public int maxResultSize() { + return 0; + } + + @Override + public ExplainResponse explain( + OperationContext opContext, + String query, + String documentId, + String entityName, + Filter postFilters, + List sortCriteria, + String scrollId, + String keepAlive, + int size, + List facets) { + return null; + } + + @Override + public IndexConvention getIndexConvention() { + return null; + } + } +}