diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java index dca77b86..59b72ce4 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java @@ -64,6 +64,7 @@ public class EntityService { new BulkUpdateEntitiesRequestValidator(); private final AttributeMetadataProvider metadataProvider; private final EntityIdColumnsConfigs entityIdColumnsConfigs; + private ExecutorService queryExecutor; private final EntityInteractionsFetcher interactionsFetcher; private final RequestPreProcessor requestPreProcessor; private final ResponsePostProcessor responsePostProcessor; @@ -84,6 +85,7 @@ public EntityService( ExecutorService queryExecutor) { this.metadataProvider = metadataProvider; this.entityIdColumnsConfigs = entityIdColumnsConfigs; + this.queryExecutor = queryExecutor; this.interactionsFetcher = new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider, queryExecutor); this.requestPreProcessor = new RequestPreProcessor(metadataProvider, scopeFilterConfigs); @@ -167,7 +169,8 @@ public EntitiesResponse getEntities( */ EntityResponse response = executionTree.acceptVisitor( - new ExecutionVisitor(executionContext, EntityQueryHandlerRegistry.get())); + new ExecutionVisitor( + executionContext, EntityQueryHandlerRegistry.get(), this.queryExecutor)); EntityFetcherResponse entityFetcherResponse = response.getEntityFetcherResponse(); diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitor.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitor.java index 8c370d47..7a77c093 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitor.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitor.java @@ -11,6 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.hypertrace.gateway.service.common.datafetcher.EntityFetcherResponse; @@ -47,12 +49,16 @@ public class ExecutionVisitor implements Visitor { private static final Logger LOG = LoggerFactory.getLogger(ExecutionVisitor.class); private final EntityQueryHandlerRegistry queryHandlerRegistry; + private final ExecutorService executorService; private final EntityExecutionContext executionContext; public ExecutionVisitor( - EntityExecutionContext executionContext, EntityQueryHandlerRegistry queryHandlerRegistry) { + EntityExecutionContext executionContext, + EntityQueryHandlerRegistry queryHandlerRegistry, + ExecutorService executorService) { this.executionContext = executionContext; this.queryHandlerRegistry = queryHandlerRegistry; + this.executorService = executorService; } private static EntityFetcherResponse intersectEntities(List builders) { @@ -157,10 +163,11 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) { // total, the total number of entities has to be fetched separately if (dataFetcherNode.canFetchTotal()) { // since, the pagination is pushed down to the data store, total can be requested directly - // from the data store - return new EntityResponse( - entityFetcher.getEntities(context, request), - entityFetcher.getTotal(context, entitiesRequest)); + // from the data store. Request it async to parallelize with rest of entity fetch + CompletableFuture totalFuture = + CompletableFuture.supplyAsync( + () -> entityFetcher.getTotal(context, entitiesRequest), this.executorService); + return new EntityResponse(entityFetcher.getEntities(context, request), totalFuture.join()); } else { // if the data fetcher node is not paginating, the total number of entities is equal to number // of records fetched diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitorTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitorTest.java index 2387839a..cdaa2ba5 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitorTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/query/visitor/ExecutionVisitorTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; import org.hypertrace.core.attribute.service.v1.AttributeScope; @@ -133,7 +134,9 @@ public void setup() { .thenReturn(queryServiceEntityFetcher); when(entityQueryHandlerRegistry.getEntityFetcher(EDS_SOURCE)) .thenReturn(entityDataServiceEntityFetcher); - executionVisitor = new ExecutionVisitor(executionContext, entityQueryHandlerRegistry); + executionVisitor = + new ExecutionVisitor( + executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor()); } @Test @@ -756,7 +759,9 @@ public void test_visitPaginateOnlyNode() { @Test public void test_visitSelectionNode_differentSource_callSeparatedCalls() { ExecutionVisitor executionVisitor = - spy(new ExecutionVisitor(executionContext, entityQueryHandlerRegistry)); + spy( + new ExecutionVisitor( + executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor())); when(executionContext.getTimestampAttributeId()).thenReturn("API.startTime"); SelectionNode selectionNode = new SelectionNode.Builder(new NoOpNode()) @@ -925,7 +930,9 @@ public void test_visitSelectionNode_nonEmptyFilter_emptyResult() { .setFilter(generateEQFilter(API_DISCOVERY_STATE, "DISCOVERED")) .build(); ExecutionVisitor executionVisitor = - spy(new ExecutionVisitor(executionContext, entityQueryHandlerRegistry)); + spy( + new ExecutionVisitor( + executionContext, entityQueryHandlerRegistry, Executors.newSingleThreadExecutor())); when(executionContext.getEntitiesRequest()).thenReturn(entitiesRequest); // Selection node with NoOp child, to short-circuit the call to first service.