Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

Commit

Permalink
fix: refactor interaction request code to execute qs queries in paral…
Browse files Browse the repository at this point in the history
…lel (#120)

* test: adds e2e test for entity interaction request

* adds incoming interaction and assertions for the test

* adds requst and response objects for query service

* add executor service and rename few variables

* makes threadpool to 6 threads

* addressed comments of reusing intraction request

* fixed stray char

* use the common executor pool defined at gateway service level

* fixed snyk issue
  • Loading branch information
kotharironak authored Mar 21, 2022
1 parent 2255fd7 commit ed5b7bb
Show file tree
Hide file tree
Showing 9 changed files with 854 additions and 51 deletions.
4 changes: 2 additions & 2 deletions gateway-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ dependencies {
implementation("com.google.protobuf:protobuf-java-util:3.19.4")
implementation("com.google.guava:guava:30.1.1-jre")

implementation("com.fasterxml.jackson.core:jackson-annotations:2.13.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.13.2")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.2")

testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
testImplementation("org.mockito:mockito-core:4.3.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public GatewayServiceImpl(Config appConfig) {
attributeMetadataProvider,
entityIdColumnsConfigs,
scopeFilterConfigs,
logConfig);
logConfig,
queryExecutor);
this.exploreService =
new ExploreService(
queryServiceClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.hypertrace.gateway.service.common.datafetcher;

import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.gateway.service.v1.entity.InteractionsRequest;

public class EntityInteractionQueryRequest {
private boolean isIncoming;
private String entityType;
private InteractionsRequest interactionsRequest;
private QueryRequest request;

public EntityInteractionQueryRequest(
boolean isIncoming,
String entityType,
InteractionsRequest interactionsRequest,
QueryRequest request) {
this.isIncoming = isIncoming;
this.entityType = entityType;
this.interactionsRequest = interactionsRequest;
this.request = request;
}

public boolean isIncoming() {
return isIncoming;
}

public String getEntityType() {
return entityType;
}

public InteractionsRequest getInteractionsRequest() {
return interactionsRequest;
}

public QueryRequest getRequest() {
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.hypertrace.gateway.service.common.datafetcher;

import java.util.Iterator;
import org.hypertrace.core.query.service.api.ResultSetChunk;

public class EntityInteractionQueryResponse {
private EntityInteractionQueryRequest request;
private Iterator<ResultSetChunk> resultSetChunkIterator;

public EntityInteractionQueryResponse(
EntityInteractionQueryRequest request, Iterator<ResultSetChunk> resultSetChunkIterator) {
this.request = request;
this.resultSetChunkIterator = resultSetChunkIterator;
}

public EntityInteractionQueryRequest getRequest() {
return request;
}

public Iterator<ResultSetChunk> getResultSetChunkIterator() {
return resultSetChunkIterator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -88,14 +90,17 @@ public class EntityInteractionsFetcher {
private final QueryServiceClient queryServiceClient;
private final int queryServiceRequestTimeout;
private final AttributeMetadataProvider metadataProvider;
private final ExecutorService queryExecutor;

public EntityInteractionsFetcher(
QueryServiceClient queryServiceClient,
int qsRequestTimeout,
AttributeMetadataProvider metadataProvider) {
AttributeMetadataProvider metadataProvider,
ExecutorService queryExecutor) {
this.queryServiceClient = queryServiceClient;
this.queryServiceRequestTimeout = qsRequestTimeout;
this.metadataProvider = metadataProvider;
this.queryExecutor = queryExecutor;
}

private List<String> getEntityIdColumnsFromInteraction(
Expand All @@ -119,31 +124,79 @@ private List<String> getEntityIdColumnsFromInteraction(
}

public void populateEntityInteractions(
RequestContext context, EntitiesRequest request, Map<EntityKey, Builder> entityBuilders) {
// Process the incoming interactions
if (!InteractionsRequest.getDefaultInstance().equals(request.getIncomingInteractions())) {
addInteractions(
context,
request,
entityBuilders,
request.getIncomingInteractions(),
INCOMING,
"fromEntityType filter is mandatory for incoming interactions.");
RequestContext context,
EntitiesRequest entitiesRequest,
Map<EntityKey, Builder> entityBuilders) {
List<EntityInteractionQueryRequest> allQueryRequests = new ArrayList<>();
// Process the incoming interactions, and prepare QS queries
if (!InteractionsRequest.getDefaultInstance()
.equals(entitiesRequest.getIncomingInteractions())) {
allQueryRequests.addAll(
prepareQueryRequests(
context,
entitiesRequest,
entityBuilders,
entitiesRequest.getIncomingInteractions(),
INCOMING,
"fromEntityType filter is mandatory for incoming interactions."));
}

// Process the outgoing interactions
if (!InteractionsRequest.getDefaultInstance().equals(request.getOutgoingInteractions())) {
addInteractions(
context,
request,
entityBuilders,
request.getOutgoingInteractions(),
OUTGOING,
"toEntityType filter is mandatory for outgoing interactions.");
// Process the outgoing interactions, and prepare the QS queries
if (!InteractionsRequest.getDefaultInstance()
.equals(entitiesRequest.getOutgoingInteractions())) {
allQueryRequests.addAll(
prepareQueryRequests(
context,
entitiesRequest,
entityBuilders,
entitiesRequest.getOutgoingInteractions(),
OUTGOING,
"toEntityType filter is mandatory for outgoing interactions."));
}

// execute all the requests in parallel, and wait for results
List<CompletableFuture<EntityInteractionQueryResponse>> queryRequestCompletableFutures =
allQueryRequests.stream()
.map(
e ->
CompletableFuture.supplyAsync(
() -> executeQueryRequest(context, e), this.queryExecutor))
.collect(Collectors.toList());

// wait and parse result as an when complete
queryRequestCompletableFutures.forEach(
queryRequestCompletableFuture -> {
EntityInteractionQueryResponse qsResponse = queryRequestCompletableFuture.join();
InteractionsRequest interactionsRequest =
qsResponse.getRequest().getInteractionsRequest();

Map<String, FunctionExpression> metricToAggFunction =
MetricAggregationFunctionUtil.getAggMetricToFunction(
interactionsRequest.getSelectionList());

parseResultSet(
entitiesRequest.getEntityType(),
qsResponse.getRequest().getEntityType(),
interactionsRequest.getSelectionList(),
metricToAggFunction,
qsResponse.getResultSetChunkIterator(),
qsResponse.getRequest().isIncoming(),
entityBuilders,
context);
});
}

private void addInteractions(
private EntityInteractionQueryResponse executeQueryRequest(
RequestContext context, EntityInteractionQueryRequest entityInteractionQueryRequest) {
Iterator<ResultSetChunk> resultSet =
queryServiceClient.executeQuery(
entityInteractionQueryRequest.getRequest(),
context.getHeaders(),
queryServiceRequestTimeout);
return new EntityInteractionQueryResponse(entityInteractionQueryRequest, resultSet);
}

private List<EntityInteractionQueryRequest> prepareQueryRequests(
RequestContext context,
EntitiesRequest request,
Map<EntityKey, Builder> entityIdToBuilders,
Expand All @@ -168,27 +221,17 @@ private void addInteractions(
entityIdToBuilders.keySet(),
incoming,
context);

if (requests.isEmpty()) {
throw new IllegalArgumentException(errorMsg);
}

Map<String, FunctionExpression> metricToAggFunction =
MetricAggregationFunctionUtil.getAggMetricToFunction(
interactionsRequest.getSelectionList());
for (Map.Entry<String, QueryRequest> entry : requests.entrySet()) {
Iterator<ResultSetChunk> resultSet =
queryServiceClient.executeQuery(
entry.getValue(), context.getHeaders(), queryServiceRequestTimeout);
parseResultSet(
request.getEntityType(),
entry.getKey(),
interactionsRequest.getSelectionList(),
metricToAggFunction,
resultSet,
incoming,
entityIdToBuilders,
context);
}
return requests.entrySet().stream()
.map(
e ->
new EntityInteractionQueryRequest(
incoming, e.getKey(), interactionsRequest, e.getValue()))
.collect(Collectors.toList());
}

private Set<String> getOtherEntityTypes(org.hypertrace.gateway.service.v1.common.Filter filter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.hypertrace.core.attribute.service.v1.AttributeMetadata;
Expand Down Expand Up @@ -79,11 +80,12 @@ public EntityService(
AttributeMetadataProvider metadataProvider,
EntityIdColumnsConfigs entityIdColumnsConfigs,
ScopeFilterConfigs scopeFilterConfigs,
LogConfig logConfig) {
LogConfig logConfig,
ExecutorService queryExecutor) {
this.metadataProvider = metadataProvider;
this.entityIdColumnsConfigs = entityIdColumnsConfigs;
this.interactionsFetcher =
new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider);
new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider, queryExecutor);
this.requestPreProcessor = new RequestPreProcessor(metadataProvider, scopeFilterConfigs);
this.responsePostProcessor = new ResponsePostProcessor();
this.edsEntityUpdater = new EdsEntityUpdater(edsQueryServiceClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.attribute.service.v1.AttributeMetadata;
import org.hypertrace.core.attribute.service.v1.AttributeScope;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class EntityInteractionsFetcherTest extends AbstractGatewayServiceTest {
.build();

@Mock private AttributeMetadataProvider attributeMetadataProvider;
@Mock private ExecutorService queryExecutor;

@Test
public void testServiceToServiceEdgeQueryRequests() {
Expand All @@ -113,7 +115,7 @@ public void testServiceToServiceEdgeQueryRequests() {
.thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build()));

EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);
Map<String, QueryRequest> queryRequests =
aggregator.buildQueryRequests(
request.getStartTimeMillis(),
Expand Down Expand Up @@ -222,7 +224,7 @@ public void testFromMultipleEntitiesQuery() {
.thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build()));

EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);
Map<String, QueryRequest> queryRequests =
aggregator.buildQueryRequests(
request.getStartTimeMillis(),
Expand Down Expand Up @@ -286,7 +288,7 @@ public void testToMultipleEntityTypesQuery() {
Optional.of(AttributeMetadata.newBuilder().setId("INTERACTION.startTime").build()));

EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);
Map<String, QueryRequest> queryRequests =
aggregator.buildQueryRequests(
request.getStartTimeMillis(),
Expand Down Expand Up @@ -405,7 +407,7 @@ public void testEntityWithInteractionMappingToMultipleAttributes() {
Optional.of(AttributeMetadata.newBuilder().setId("INTERACTION.startTime").build()));

EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);
LinkedHashSet<EntityKey> entityKeys = new LinkedHashSet<>();
entityKeys.add(EntityKey.of("test_name1", "test_type1"));
entityKeys.add(EntityKey.of("test_name2", "test_type2"));
Expand Down Expand Up @@ -505,7 +507,7 @@ public void testCornerCases() {
.thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build()));

EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);
Map<String, QueryRequest> queryRequests =
aggregator.buildQueryRequests(
request.getStartTimeMillis(),
Expand Down Expand Up @@ -579,7 +581,7 @@ public void testInvalidRequests() {
Mockito.eq("startTime")))
.thenReturn(Optional.of(AttributeMetadata.newBuilder().setFqn("dummy").build()));
EntityInteractionsFetcher aggregator =
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider);
new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor);

for (EntitiesRequest request : getInvalidRequests()) {
try {
Expand Down
Loading

0 comments on commit ed5b7bb

Please sign in to comment.