From b2c223653251951504979c1da0b1eede10cee331 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Mon, 24 Jul 2023 00:34:16 -0500 Subject: [PATCH] Complete futures for async produce --- .../metadata/entity/EntityServiceImpl.java | 41 +++++++++++++------ .../boot/steps/IndexDataPlatformsStep.java | 18 +++++++- .../steps/RestoreColumnLineageIndices.java | 30 ++++++++++++-- .../boot/steps/RestoreDbtSiblingsIndices.java | 17 +++++++- .../boot/steps/RestoreGlossaryIndices.java | 31 ++++++++++++-- .../RestoreColumnLineageIndicesTest.java | 8 ++++ .../steps/RestoreGlossaryIndicesTest.java | 14 ++++++- 7 files changed, 134 insertions(+), 25 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 0d76ea19482144..6fbd027f34dafe 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1062,17 +1062,17 @@ public Map getEntities(@Nonnull final Set urns, @Nonnull Set, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec, - @Nonnull final MetadataChangeLog metadataChangeLog) { + @Nonnull final MetadataChangeLog metadataChangeLog) { Future future = _producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); return Pair.of(future, preprocessEvent(metadataChangeLog)); } @Override public Pair, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName, - @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, - @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, - @Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp, - @Nonnull final ChangeType changeType) { + @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, + @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, + @Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp, + @Nonnull final ChangeType changeType) { final MetadataChangeLog metadataChangeLog = constructMCL(null, entityName, urn, changeType, aspectName, auditStamp, newAspectValue, newSystemMetadata, oldAspectValue, oldSystemMetadata); return alwaysProduceMCLAsync(urn, aspectSpec, metadataChangeLog); @@ -1382,24 +1382,33 @@ public RollbackRunResult rollbackWithConditions(List aspectRow List removedAspects = new ArrayList<>(); AtomicInteger rowsDeletedFromEntityDeletion = new AtomicInteger(0); - aspectRows.forEach(aspectToRemove -> { - + List> futures = aspectRows.stream().map(aspectToRemove -> { RollbackResult result = deleteAspect(aspectToRemove.getUrn(), aspectToRemove.getAspectName(), conditions, hardDelete); if (result != null) { Optional aspectSpec = getAspectSpec(result.entityName, result.aspectName); if (!aspectSpec.isPresent()) { log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName); - return; + return null; } rowsDeletedFromEntityDeletion.addAndGet(result.additionalRowsAffected); removedAspects.add(aspectToRemove); - alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(), + return alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(), result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(), // TODO: use properly attributed audit stamp. createSystemAuditStamp(), - result.getChangeType()); + result.getChangeType()).getFirst(); + } + + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + + futures.forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } }); @@ -1438,11 +1447,19 @@ public RollbackRunResult deleteUrn(Urn urn) { rowsDeletedFromEntityDeletion = result.additionalRowsAffected; removedAspects.add(summary); - alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), keySpec, + Future future = alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), keySpec, result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(), // TODO: Use a proper inferred audit stamp createSystemAuditStamp(), - result.getChangeType()); + result.getChangeType()).getFirst(); + + if (future != null) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } } return new RollbackRunResult(removedAspects, rowsDeletedFromEntityDeletion); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IndexDataPlatformsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IndexDataPlatformsStep.java index 45eb9e35226e14..b26eb67465c0d3 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IndexDataPlatformsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IndexDataPlatformsStep.java @@ -13,10 +13,15 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.search.EntitySearchService; + import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -73,6 +78,7 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla ); // Loop over Data platforms and produce changelog + List> futures = new LinkedList<>(); for (Urn dpUrn : dataPlatformUrns) { EntityResponse dataPlatformEntityResponse = dataPlatformInfoResponses.get(dpUrn); if (dataPlatformEntityResponse == null) { @@ -86,7 +92,7 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( dpUrn, Constants.DATA_PLATFORM_ENTITY_NAME, Constants.DATA_PLATFORM_INFO_ASPECT_NAME, @@ -96,9 +102,17 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return listResult.getTotal(); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java index 654119287a6b1d..1f5f7f26ed89bf 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java @@ -16,7 +16,11 @@ import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; +import java.util.LinkedList; +import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @Slf4j public class RestoreColumnLineageIndices extends UpgradeStep { @@ -89,6 +93,7 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp return latestAspects.getTotalCount(); } + List> futures = new LinkedList<>(); for (int i = 0; i < latestAspects.getValues().size(); i++) { ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); RecordTemplate upstreamLineageRecord = latestAspects.getValues().get(i); @@ -99,7 +104,7 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( urn, Constants.DATASET_ENTITY_NAME, Constants.UPSTREAM_LINEAGE_ASPECT_NAME, @@ -109,9 +114,17 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return latestAspects.getTotalCount(); } @@ -140,6 +153,7 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS return latestAspects.getTotalCount(); } + List> futures = new LinkedList<>(); for (int i = 0; i < latestAspects.getValues().size(); i++) { ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); RecordTemplate inputFieldsRecord = latestAspects.getValues().get(i); @@ -150,7 +164,7 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( urn, entityName, Constants.INPUT_FIELDS_ASPECT_NAME, @@ -160,9 +174,17 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return latestAspects.getTotalCount(); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java index 4828e3b2b2b289..355936fe1994c4 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java @@ -23,8 +23,12 @@ import java.net.URISyntaxException; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nonnull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -126,6 +130,7 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam } // Loop over datasets and produce changelog + List> futures = new LinkedList<>(); for (Urn datasetUrn : datasetUrns) { EntityResponse response = upstreamLineageResponse.get(datasetUrn); if (response == null) { @@ -137,7 +142,7 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( datasetUrn, DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, @@ -147,8 +152,16 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); } private UpstreamLineage getUpstreamLineage(EntityResponse entityResponse) { diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java index a5ea95a271b7f5..23dd2284cb3a60 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndices.java @@ -16,10 +16,15 @@ import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchResult; + import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -87,6 +92,7 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) ); // Loop over Terms and produce changelog + List> futures = new LinkedList<>(); for (Urn termUrn : termUrns) { EntityResponse termEntityResponse = termInfoResponses.get(termUrn); if (termEntityResponse == null) { @@ -99,7 +105,7 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( termUrn, Constants.GLOSSARY_TERM_ENTITY_NAME, Constants.GLOSSARY_TERM_INFO_ASPECT_NAME, @@ -109,9 +115,17 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return termsResult.getNumEntities(); } @@ -130,6 +144,7 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) ); // Loop over Nodes and produce changelog + List> futures = new LinkedList<>(); for (Urn nodeUrn : nodeUrns) { EntityResponse nodeEntityResponse = nodeInfoResponses.get(nodeUrn); if (nodeEntityResponse == null) { @@ -142,7 +157,7 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) continue; } - _entityService.alwaysProduceMCLAsync( + futures.add(_entityService.alwaysProduceMCLAsync( nodeUrn, Constants.GLOSSARY_NODE_ENTITY_NAME, Constants.GLOSSARY_NODE_INFO_ASPECT_NAME, @@ -152,9 +167,17 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false) null, null, auditStamp, - ChangeType.RESTATE); + ChangeType.RESTATE).getFirst()); } + futures.stream().filter(Objects::nonNull).forEach(f -> { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + return nodesResult.getNumEntities(); } diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java index d90c1947b3e5e4..aca5e322567d8c 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java @@ -21,6 +21,7 @@ import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.util.Pair; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; public class RestoreColumnLineageIndicesTest { @@ -234,6 +236,12 @@ private void mockGetUpstreamLineage(@Nonnull Urn datasetUrn, @Nonnull EntityServ .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) ); + Mockito.when(mockService.alwaysProduceMCLAsync( + Mockito.any(Urn.class), Mockito.anyString(), Mockito.anyString(), Mockito.any(AspectSpec.class), + Mockito.eq(null), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(ChangeType.class) + )).thenReturn(Pair.of(Mockito.mock(Future.class), false)); + Mockito.when(mockService.listLatestAspects( Mockito.eq(Constants.DATASET_ENTITY_NAME), Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java index e6104c9c590638..c4bbfaf052ed86 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java @@ -21,6 +21,8 @@ import com.linkedin.metadata.search.SearchResult; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.util.Pair; +import org.mockito.Mock; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -28,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.Future; public class RestoreGlossaryIndicesTest { @@ -93,6 +96,11 @@ public void testExecuteFirstTime() throws Exception { upgradeEntityUrn, Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) )).thenReturn(null); + Mockito.when(mockService.alwaysProduceMCLAsync( + Mockito.any(Urn.class), Mockito.anyString(), Mockito.anyString(), Mockito.any(AspectSpec.class), + Mockito.eq(null), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(ChangeType.class) + )).thenReturn(Pair.of(Mockito.mock(Future.class), false)); mockGetTermInfo(glossaryTermUrn, mockSearchService, mockService); mockGetNodeInfo(glossaryNodeUrn, mockSearchService, mockService); @@ -154,6 +162,11 @@ public void testExecutesWithNewVersion() throws Exception { upgradeEntityUrn, Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) )).thenReturn(response); + Mockito.when(mockService.alwaysProduceMCLAsync( + Mockito.any(Urn.class), Mockito.anyString(), Mockito.anyString(), Mockito.any(AspectSpec.class), + Mockito.eq(null), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(ChangeType.class) + )).thenReturn(Pair.of(Mockito.mock(Future.class), false)); mockGetTermInfo(glossaryTermUrn, mockSearchService, mockService); mockGetNodeInfo(glossaryNodeUrn, mockSearchService, mockService); @@ -163,7 +176,6 @@ public void testExecutesWithNewVersion() throws Exception { RestoreGlossaryIndices restoreIndicesStep = new RestoreGlossaryIndices(mockService, mockSearchService, mockRegistry); restoreIndicesStep.execute(); - Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME); Mockito.verify(mockService, Mockito.times(2)).ingestProposal(