Skip to content

Commit

Permalink
Complete futures for async produce
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Jul 24, 2023
1 parent f87b74e commit fc4e48b
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1062,17 +1062,17 @@ public Map<Urn, Entity> getEntities(@Nonnull final Set<Urn> urns, @Nonnull Set<S

@Override
public Pair<Future<?>, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog) {
@Nonnull final MetadataChangeLog metadataChangeLog) {
Future<?> future = _producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
return Pair.of(future, preprocessEvent(metadataChangeLog));
}

@Override
public Pair<Future<?>, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName,
@Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue,
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
@Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp,
@Nonnull final ChangeType changeType) {
@Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue,
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
@Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp,
@Nonnull final ChangeType changeType) {
final MetadataChangeLog metadataChangeLog = constructMCL(null, entityName, urn, changeType, aspectName, auditStamp,
newAspectValue, newSystemMetadata, oldAspectValue, oldSystemMetadata);
return alwaysProduceMCLAsync(urn, aspectSpec, metadataChangeLog);
Expand Down Expand Up @@ -1382,24 +1382,33 @@ public RollbackRunResult rollbackWithConditions(List<AspectRowSummary> aspectRow
List<AspectRowSummary> removedAspects = new ArrayList<>();
AtomicInteger rowsDeletedFromEntityDeletion = new AtomicInteger(0);

aspectRows.forEach(aspectToRemove -> {

List<Future<?>> futures = aspectRows.stream().map(aspectToRemove -> {
RollbackResult result = deleteAspect(aspectToRemove.getUrn(), aspectToRemove.getAspectName(),
conditions, hardDelete);
if (result != null) {
Optional<AspectSpec> aspectSpec = getAspectSpec(result.entityName, result.aspectName);
if (!aspectSpec.isPresent()) {
log.error("Issue while rolling back: unknown aspect {} for entity {}", result.entityName, result.aspectName);
return;
return null;
}

rowsDeletedFromEntityDeletion.addAndGet(result.additionalRowsAffected);
removedAspects.add(aspectToRemove);
alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(),
return alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), aspectSpec.get(),
result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(),
// TODO: use properly attributed audit stamp.
createSystemAuditStamp(),
result.getChangeType());
result.getChangeType()).getFirst();
}

return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

Expand Down Expand Up @@ -1438,11 +1447,19 @@ public RollbackRunResult deleteUrn(Urn urn) {

rowsDeletedFromEntityDeletion = result.additionalRowsAffected;
removedAspects.add(summary);
alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), keySpec,
Future<?> future = alwaysProduceMCLAsync(result.getUrn(), result.getEntityName(), result.getAspectName(), keySpec,
result.getOldValue(), result.getNewValue(), result.getOldSystemMetadata(), result.getNewSystemMetadata(),
// TODO: Use a proper inferred audit stamp
createSystemAuditStamp(),
result.getChangeType());
result.getChangeType()).getFirst();

if (future != null) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

return new RollbackRunResult(removedAspects, rowsDeletedFromEntityDeletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*;
import static com.linkedin.metadata.DockerTestUtils.*;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.AUTO_COMPLETE_ENTITY_TYPES;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.SEARCHABLE_ENTITY_TYPES;
import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine;

public class ESTestUtils {
private ESTestUtils() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.search.EntitySearchService;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -73,6 +78,7 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla
);

// Loop over Data platforms and produce changelog
List<Future<?>> futures = new LinkedList<>();
for (Urn dpUrn : dataPlatformUrns) {
EntityResponse dataPlatformEntityResponse = dataPlatformInfoResponses.get(dpUrn);
if (dataPlatformEntityResponse == null) {
Expand All @@ -86,7 +92,7 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
dpUrn,
Constants.DATA_PLATFORM_ENTITY_NAME,
Constants.DATA_PLATFORM_INFO_ASPECT_NAME,
Expand All @@ -96,9 +102,17 @@ private int getAndReIndexDataPlatforms(AuditStamp auditStamp, AspectSpec dataPla
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return listResult.getTotal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Slf4j
public class RestoreColumnLineageIndices extends UpgradeStep {
Expand Down Expand Up @@ -89,6 +93,7 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp
return latestAspects.getTotalCount();
}

List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate upstreamLineageRecord = latestAspects.getValues().get(i);
Expand All @@ -99,7 +104,7 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
urn,
Constants.DATASET_ENTITY_NAME,
Constants.UPSTREAM_LINEAGE_ASPECT_NAME,
Expand All @@ -109,9 +114,17 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return latestAspects.getTotalCount();
}

Expand Down Expand Up @@ -140,6 +153,7 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS
return latestAspects.getTotalCount();
}

List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate inputFieldsRecord = latestAspects.getValues().get(i);
Expand All @@ -150,7 +164,7 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
urn,
entityName,
Constants.INPUT_FIELDS_ASPECT_NAME,
Expand All @@ -160,9 +174,17 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return latestAspects.getTotalCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -126,6 +130,7 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam
}

// Loop over datasets and produce changelog
List<Future<?>> futures = new LinkedList<>();
for (Urn datasetUrn : datasetUrns) {
EntityResponse response = upstreamLineageResponse.get(datasetUrn);
if (response == null) {
Expand All @@ -137,7 +142,7 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
datasetUrn,
DATASET_ENTITY_NAME,
UPSTREAM_LINEAGE_ASPECT_NAME,
Expand All @@ -147,8 +152,16 @@ private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStam
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}

private UpstreamLineage getUpstreamLineage(EntityResponse entityResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -87,6 +92,7 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
);

// Loop over Terms and produce changelog
List<Future<?>> futures = new LinkedList<>();
for (Urn termUrn : termUrns) {
EntityResponse termEntityResponse = termInfoResponses.get(termUrn);
if (termEntityResponse == null) {
Expand All @@ -99,7 +105,7 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
termUrn,
Constants.GLOSSARY_TERM_ENTITY_NAME,
Constants.GLOSSARY_TERM_INFO_ASPECT_NAME,
Expand All @@ -109,9 +115,17 @@ null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return termsResult.getNumEntities();
}

Expand All @@ -130,6 +144,7 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
);

// Loop over Nodes and produce changelog
List<Future<?>> futures = new LinkedList<>();
for (Urn nodeUrn : nodeUrns) {
EntityResponse nodeEntityResponse = nodeInfoResponses.get(nodeUrn);
if (nodeEntityResponse == null) {
Expand All @@ -142,7 +157,7 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
continue;
}

_entityService.alwaysProduceMCLAsync(
futures.add(_entityService.alwaysProduceMCLAsync(
nodeUrn,
Constants.GLOSSARY_NODE_ENTITY_NAME,
Constants.GLOSSARY_NODE_INFO_ASPECT_NAME,
Expand All @@ -152,9 +167,17 @@ null, null, start, BATCH_SIZE, new SearchFlags().setFulltext(false)
null,
null,
auditStamp,
ChangeType.RESTATE);
ChangeType.RESTATE).getFirst());
}

futures.stream().filter(Objects::nonNull).forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return nodesResult.getNumEntities();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import org.mockito.Mockito;
import org.testng.annotations.Test;

Expand All @@ -29,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

public class RestoreColumnLineageIndicesTest {

Expand Down Expand Up @@ -234,6 +236,12 @@ private void mockGetUpstreamLineage(@Nonnull Urn datasetUrn, @Nonnull EntityServ
.setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L))
);

Mockito.when(mockService.alwaysProduceMCLAsync(
Mockito.any(Urn.class), Mockito.anyString(), Mockito.anyString(), Mockito.any(AspectSpec.class),
Mockito.eq(null), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any(ChangeType.class)
)).thenReturn(Pair.of(Mockito.mock(Future.class), false));

Mockito.when(mockService.listLatestAspects(
Mockito.eq(Constants.DATASET_ENTITY_NAME),
Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME),
Expand Down
Loading

0 comments on commit fc4e48b

Please sign in to comment.