Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

Commit

Permalink
refactor: parallelize total entities query (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-steinfeld authored Jun 28, 2022
1 parent 2b307b8 commit f25cce6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class EntityService {
new BulkUpdateEntitiesRequestValidator();
private final AttributeMetadataProvider metadataProvider;
private final EntityIdColumnsConfigs entityIdColumnsConfigs;
private ExecutorService queryExecutor;
private final EntityInteractionsFetcher interactionsFetcher;
private final RequestPreProcessor requestPreProcessor;
private final ResponsePostProcessor responsePostProcessor;
Expand All @@ -84,6 +85,7 @@ public EntityService(
ExecutorService queryExecutor) {
this.metadataProvider = metadataProvider;
this.entityIdColumnsConfigs = entityIdColumnsConfigs;
this.queryExecutor = queryExecutor;
this.interactionsFetcher =
new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider, queryExecutor);
this.requestPreProcessor = new RequestPreProcessor(metadataProvider, scopeFilterConfigs);
Expand Down Expand Up @@ -167,7 +169,8 @@ public EntitiesResponse getEntities(
*/
EntityResponse response =
executionTree.acceptVisitor(
new ExecutionVisitor(executionContext, EntityQueryHandlerRegistry.get()));
new ExecutionVisitor(
executionContext, EntityQueryHandlerRegistry.get(), this.queryExecutor));

EntityFetcherResponse entityFetcherResponse = response.getEntityFetcherResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hypertrace.gateway.service.common.datafetcher.EntityFetcherResponse;
Expand Down Expand Up @@ -47,12 +49,16 @@ public class ExecutionVisitor implements Visitor<EntityResponse> {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionVisitor.class);

private final EntityQueryHandlerRegistry queryHandlerRegistry;
private final ExecutorService executorService;
private final EntityExecutionContext executionContext;

public ExecutionVisitor(
EntityExecutionContext executionContext, EntityQueryHandlerRegistry queryHandlerRegistry) {
EntityExecutionContext executionContext,
EntityQueryHandlerRegistry queryHandlerRegistry,
ExecutorService executorService) {
this.executionContext = executionContext;
this.queryHandlerRegistry = queryHandlerRegistry;
this.executorService = executorService;
}

private static EntityFetcherResponse intersectEntities(List<EntityFetcherResponse> builders) {
Expand Down Expand Up @@ -157,10 +163,11 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) {
// total, the total number of entities has to be fetched separately
if (dataFetcherNode.canFetchTotal()) {
// since, the pagination is pushed down to the data store, total can be requested directly
// from the data store
return new EntityResponse(
entityFetcher.getEntities(context, request),
entityFetcher.getTotal(context, entitiesRequest));
// from the data store. Request it async to parallelize with rest of entity fetch
CompletableFuture<Long> totalFuture =
CompletableFuture.supplyAsync(
() -> entityFetcher.getTotal(context, entitiesRequest), this.executorService);
return new EntityResponse(entityFetcher.getEntities(context, request), totalFuture.join());
} else {
// if the data fetcher node is not paginating, the total number of entities is equal to number
// of records fetched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hypertrace.core.attribute.service.v1.AttributeScope;
Expand Down Expand Up @@ -133,7 +134,9 @@ public void setup() {
.thenReturn(queryServiceEntityFetcher);
when(entityQueryHandlerRegistry.getEntityFetcher(EDS_SOURCE))
.thenReturn(entityDataServiceEntityFetcher);
executionVisitor = new ExecutionVisitor(executionContext, entityQueryHandlerRegistry);
executionVisitor =
new ExecutionVisitor(
executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor());
}

@Test
Expand Down Expand Up @@ -756,7 +759,9 @@ public void test_visitPaginateOnlyNode() {
@Test
public void test_visitSelectionNode_differentSource_callSeparatedCalls() {
ExecutionVisitor executionVisitor =
spy(new ExecutionVisitor(executionContext, entityQueryHandlerRegistry));
spy(
new ExecutionVisitor(
executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor()));
when(executionContext.getTimestampAttributeId()).thenReturn("API.startTime");
SelectionNode selectionNode =
new SelectionNode.Builder(new NoOpNode())
Expand Down Expand Up @@ -925,7 +930,9 @@ public void test_visitSelectionNode_nonEmptyFilter_emptyResult() {
.setFilter(generateEQFilter(API_DISCOVERY_STATE, "DISCOVERED"))
.build();
ExecutionVisitor executionVisitor =
spy(new ExecutionVisitor(executionContext, entityQueryHandlerRegistry));
spy(
new ExecutionVisitor(
executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor()));
when(executionContext.getEntitiesRequest()).thenReturn(entitiesRequest);

// Selection node with NoOp child, to short-circuit the call to first service.
Expand Down

0 comments on commit f25cce6

Please sign in to comment.