diff --git a/gateway-service-impl/build.gradle.kts b/gateway-service-impl/build.gradle.kts index 3aa6e204..c402347f 100644 --- a/gateway-service-impl/build.gradle.kts +++ b/gateway-service-impl/build.gradle.kts @@ -27,8 +27,8 @@ dependencies { implementation("com.google.protobuf:protobuf-java-util:3.19.4") implementation("com.google.guava:guava:30.1.1-jre") - implementation("com.fasterxml.jackson.core:jackson-annotations:2.13.1") - implementation("com.fasterxml.jackson.core:jackson-databind:2.13.1") + implementation("com.fasterxml.jackson.core:jackson-annotations:2.13.2") + implementation("com.fasterxml.jackson.core:jackson-databind:2.13.2") testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") testImplementation("org.mockito:mockito-core:4.3.1") diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java index e1e80816..5fbe1964 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java @@ -112,7 +112,8 @@ public GatewayServiceImpl(Config appConfig) { attributeMetadataProvider, entityIdColumnsConfigs, scopeFilterConfigs, - logConfig); + logConfig, + queryExecutor); this.exploreService = new ExploreService( queryServiceClient, diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryRequest.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryRequest.java new file mode 100644 index 00000000..b2e4e192 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryRequest.java @@ -0,0 +1,38 @@ +package org.hypertrace.gateway.service.common.datafetcher; + +import org.hypertrace.core.query.service.api.QueryRequest; +import org.hypertrace.gateway.service.v1.entity.InteractionsRequest; + +public class EntityInteractionQueryRequest { + private boolean isIncoming; + private String entityType; + private InteractionsRequest interactionsRequest; + private QueryRequest request; + + public EntityInteractionQueryRequest( + boolean isIncoming, + String entityType, + InteractionsRequest interactionsRequest, + QueryRequest request) { + this.isIncoming = isIncoming; + this.entityType = entityType; + this.interactionsRequest = interactionsRequest; + this.request = request; + } + + public boolean isIncoming() { + return isIncoming; + } + + public String getEntityType() { + return entityType; + } + + public InteractionsRequest getInteractionsRequest() { + return interactionsRequest; + } + + public QueryRequest getRequest() { + return request; + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryResponse.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryResponse.java new file mode 100644 index 00000000..e732dd38 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionQueryResponse.java @@ -0,0 +1,23 @@ +package org.hypertrace.gateway.service.common.datafetcher; + +import java.util.Iterator; +import org.hypertrace.core.query.service.api.ResultSetChunk; + +public class EntityInteractionQueryResponse { + private EntityInteractionQueryRequest request; + private Iterator resultSetChunkIterator; + + public EntityInteractionQueryResponse( + EntityInteractionQueryRequest request, Iterator resultSetChunkIterator) { + this.request = request; + this.resultSetChunkIterator = resultSetChunkIterator; + } + + public EntityInteractionQueryRequest getRequest() { + return request; + } + + public Iterator getResultSetChunkIterator() { + return resultSetChunkIterator; + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcher.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcher.java index 6c88b7d2..612037c5 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcher.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcher.java @@ -18,6 +18,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; @@ -88,14 +90,17 @@ public class EntityInteractionsFetcher { private final QueryServiceClient queryServiceClient; private final int queryServiceRequestTimeout; private final AttributeMetadataProvider metadataProvider; + private final ExecutorService queryExecutor; public EntityInteractionsFetcher( QueryServiceClient queryServiceClient, int qsRequestTimeout, - AttributeMetadataProvider metadataProvider) { + AttributeMetadataProvider metadataProvider, + ExecutorService queryExecutor) { this.queryServiceClient = queryServiceClient; this.queryServiceRequestTimeout = qsRequestTimeout; this.metadataProvider = metadataProvider; + this.queryExecutor = queryExecutor; } private List getEntityIdColumnsFromInteraction( @@ -119,31 +124,79 @@ private List getEntityIdColumnsFromInteraction( } public void populateEntityInteractions( - RequestContext context, EntitiesRequest request, Map entityBuilders) { - // Process the incoming interactions - if (!InteractionsRequest.getDefaultInstance().equals(request.getIncomingInteractions())) { - addInteractions( - context, - request, - entityBuilders, - request.getIncomingInteractions(), - INCOMING, - "fromEntityType filter is mandatory for incoming interactions."); + RequestContext context, + EntitiesRequest entitiesRequest, + Map entityBuilders) { + List allQueryRequests = new ArrayList<>(); + // Process the incoming interactions, and prepare QS queries + if (!InteractionsRequest.getDefaultInstance() + .equals(entitiesRequest.getIncomingInteractions())) { + allQueryRequests.addAll( + prepareQueryRequests( + context, + entitiesRequest, + entityBuilders, + entitiesRequest.getIncomingInteractions(), + INCOMING, + "fromEntityType filter is mandatory for incoming interactions.")); } - // Process the outgoing interactions - if (!InteractionsRequest.getDefaultInstance().equals(request.getOutgoingInteractions())) { - addInteractions( - context, - request, - entityBuilders, - request.getOutgoingInteractions(), - OUTGOING, - "toEntityType filter is mandatory for outgoing interactions."); + // Process the outgoing interactions, and prepare the QS queries + if (!InteractionsRequest.getDefaultInstance() + .equals(entitiesRequest.getOutgoingInteractions())) { + allQueryRequests.addAll( + prepareQueryRequests( + context, + entitiesRequest, + entityBuilders, + entitiesRequest.getOutgoingInteractions(), + OUTGOING, + "toEntityType filter is mandatory for outgoing interactions.")); } + + // execute all the requests in parallel, and wait for results + List> queryRequestCompletableFutures = + allQueryRequests.stream() + .map( + e -> + CompletableFuture.supplyAsync( + () -> executeQueryRequest(context, e), this.queryExecutor)) + .collect(Collectors.toList()); + + // wait and parse result as an when complete + queryRequestCompletableFutures.forEach( + queryRequestCompletableFuture -> { + EntityInteractionQueryResponse qsResponse = queryRequestCompletableFuture.join(); + InteractionsRequest interactionsRequest = + qsResponse.getRequest().getInteractionsRequest(); + + Map metricToAggFunction = + MetricAggregationFunctionUtil.getAggMetricToFunction( + interactionsRequest.getSelectionList()); + + parseResultSet( + entitiesRequest.getEntityType(), + qsResponse.getRequest().getEntityType(), + interactionsRequest.getSelectionList(), + metricToAggFunction, + qsResponse.getResultSetChunkIterator(), + qsResponse.getRequest().isIncoming(), + entityBuilders, + context); + }); } - private void addInteractions( + private EntityInteractionQueryResponse executeQueryRequest( + RequestContext context, EntityInteractionQueryRequest entityInteractionQueryRequest) { + Iterator resultSet = + queryServiceClient.executeQuery( + entityInteractionQueryRequest.getRequest(), + context.getHeaders(), + queryServiceRequestTimeout); + return new EntityInteractionQueryResponse(entityInteractionQueryRequest, resultSet); + } + + private List prepareQueryRequests( RequestContext context, EntitiesRequest request, Map entityIdToBuilders, @@ -168,27 +221,17 @@ private void addInteractions( entityIdToBuilders.keySet(), incoming, context); + if (requests.isEmpty()) { throw new IllegalArgumentException(errorMsg); } - Map metricToAggFunction = - MetricAggregationFunctionUtil.getAggMetricToFunction( - interactionsRequest.getSelectionList()); - for (Map.Entry entry : requests.entrySet()) { - Iterator resultSet = - queryServiceClient.executeQuery( - entry.getValue(), context.getHeaders(), queryServiceRequestTimeout); - parseResultSet( - request.getEntityType(), - entry.getKey(), - interactionsRequest.getSelectionList(), - metricToAggFunction, - resultSet, - incoming, - entityIdToBuilders, - context); - } + return requests.entrySet().stream() + .map( + e -> + new EntityInteractionQueryRequest( + incoming, e.getKey(), interactionsRequest, e.getValue())) + .collect(Collectors.toList()); } private Set getOtherEntityTypes(org.hypertrace.gateway.service.v1.common.Filter filter) { diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java index efc7cdc5..dca77b86 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/entity/EntityService.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; @@ -79,11 +80,12 @@ public EntityService( AttributeMetadataProvider metadataProvider, EntityIdColumnsConfigs entityIdColumnsConfigs, ScopeFilterConfigs scopeFilterConfigs, - LogConfig logConfig) { + LogConfig logConfig, + ExecutorService queryExecutor) { this.metadataProvider = metadataProvider; this.entityIdColumnsConfigs = entityIdColumnsConfigs; this.interactionsFetcher = - new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider); + new EntityInteractionsFetcher(qsClient, qsRequestTimeout, metadataProvider, queryExecutor); this.requestPreProcessor = new RequestPreProcessor(metadataProvider, scopeFilterConfigs); this.responsePostProcessor = new ResponsePostProcessor(); this.edsEntityUpdater = new EdsEntityUpdater(edsQueryServiceClient); diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcherTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcherTest.java index c9a82fc7..2d6ad22b 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcherTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/common/datafetcher/EntityInteractionsFetcherTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeScope; @@ -93,6 +94,7 @@ public class EntityInteractionsFetcherTest extends AbstractGatewayServiceTest { .build(); @Mock private AttributeMetadataProvider attributeMetadataProvider; + @Mock private ExecutorService queryExecutor; @Test public void testServiceToServiceEdgeQueryRequests() { @@ -113,7 +115,7 @@ public void testServiceToServiceEdgeQueryRequests() { .thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); Map queryRequests = aggregator.buildQueryRequests( request.getStartTimeMillis(), @@ -222,7 +224,7 @@ public void testFromMultipleEntitiesQuery() { .thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); Map queryRequests = aggregator.buildQueryRequests( request.getStartTimeMillis(), @@ -286,7 +288,7 @@ public void testToMultipleEntityTypesQuery() { Optional.of(AttributeMetadata.newBuilder().setId("INTERACTION.startTime").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); Map queryRequests = aggregator.buildQueryRequests( request.getStartTimeMillis(), @@ -405,7 +407,7 @@ public void testEntityWithInteractionMappingToMultipleAttributes() { Optional.of(AttributeMetadata.newBuilder().setId("INTERACTION.startTime").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); LinkedHashSet entityKeys = new LinkedHashSet<>(); entityKeys.add(EntityKey.of("test_name1", "test_type1")); entityKeys.add(EntityKey.of("test_name2", "test_type2")); @@ -505,7 +507,7 @@ public void testCornerCases() { .thenReturn(Optional.of(AttributeMetadata.newBuilder().setId("dummy").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); Map queryRequests = aggregator.buildQueryRequests( request.getStartTimeMillis(), @@ -579,7 +581,7 @@ public void testInvalidRequests() { Mockito.eq("startTime"))) .thenReturn(Optional.of(AttributeMetadata.newBuilder().setFqn("dummy").build())); EntityInteractionsFetcher aggregator = - new EntityInteractionsFetcher(null, 500, attributeMetadataProvider); + new EntityInteractionsFetcher(null, 500, attributeMetadataProvider, queryExecutor); for (EntitiesRequest request : getInvalidRequests()) { try { diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceInteractionRequestTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceInteractionRequestTest.java new file mode 100644 index 00000000..a9c6ddad --- /dev/null +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceInteractionRequestTest.java @@ -0,0 +1,679 @@ +package org.hypertrace.gateway.service.entity; + +import static org.hypertrace.gateway.service.common.QueryServiceRequestAndResponseUtils.createQsAggregationExpression; +import static org.hypertrace.gateway.service.common.QueryServiceRequestAndResponseUtils.createQsDefaultRequestFilter; +import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createAttributeExpression; +import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createCompositeFilter; +import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createFilter; +import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createStringArrayLiteralExpression; +import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createStringNullLiteralExpression; +import static org.hypertrace.gateway.service.common.util.QueryExpressionUtil.buildAttributeExpression; +import static org.hypertrace.gateway.service.common.util.QueryExpressionUtil.getAggregateFunctionExpression; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.hypertrace.core.attribute.service.v1.AttributeKind; +import org.hypertrace.core.attribute.service.v1.AttributeMetadata; +import org.hypertrace.core.attribute.service.v1.AttributeScope; +import org.hypertrace.core.attribute.service.v1.AttributeSource; +import org.hypertrace.core.attribute.service.v1.AttributeType; +import org.hypertrace.core.query.service.api.ColumnMetadata; +import org.hypertrace.core.query.service.api.QueryRequest; +import org.hypertrace.core.query.service.api.ResultSetChunk; +import org.hypertrace.core.query.service.api.ResultSetMetadata; +import org.hypertrace.core.query.service.api.Row; +import org.hypertrace.core.query.service.client.QueryServiceClient; +import org.hypertrace.entity.query.service.client.EntityQueryServiceClient; +import org.hypertrace.gateway.service.AbstractGatewayServiceTest; +import org.hypertrace.gateway.service.common.AttributeMetadataProvider; +import org.hypertrace.gateway.service.common.RequestContext; +import org.hypertrace.gateway.service.common.config.ScopeFilterConfigs; +import org.hypertrace.gateway.service.common.converters.QueryRequestUtil; +import org.hypertrace.gateway.service.entity.config.EntityIdColumnsConfigs; +import org.hypertrace.gateway.service.entity.config.LogConfig; +import org.hypertrace.gateway.service.executor.QueryExecutorConfig; +import org.hypertrace.gateway.service.executor.QueryExecutorServiceFactory; +import org.hypertrace.gateway.service.v1.common.DomainEntityType; +import org.hypertrace.gateway.service.v1.common.Expression; +import org.hypertrace.gateway.service.v1.common.Filter; +import org.hypertrace.gateway.service.v1.common.FunctionType; +import org.hypertrace.gateway.service.v1.common.LiteralConstant; +import org.hypertrace.gateway.service.v1.common.Operator; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.entity.EntitiesRequest; +import org.hypertrace.gateway.service.v1.entity.EntitiesResponse; +import org.hypertrace.gateway.service.v1.entity.EntityInteraction; +import org.hypertrace.gateway.service.v1.entity.InteractionsRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class EntityServiceInteractionRequestTest extends AbstractGatewayServiceTest { + private QueryServiceClient queryServiceClient; + private EntityQueryServiceClient entityQueryServiceClient; + private AttributeMetadataProvider attributeMetadataProvider; + private EntityIdColumnsConfigs entityIdColumnsConfigs; + private LogConfig logConfig; + private ScopeFilterConfigs scopeFilterConfigs; + private ExecutorService queryExecutor; + + @BeforeEach + public void setup() { + super.setup(); + mockEntityIdColumnConfigs(); + queryServiceClient = Mockito.mock(QueryServiceClient.class); + entityQueryServiceClient = Mockito.mock(EntityQueryServiceClient.class); + attributeMetadataProvider = Mockito.mock(AttributeMetadataProvider.class); + mock(attributeMetadataProvider); + logConfig = Mockito.mock(LogConfig.class); + when(logConfig.getQueryThresholdInMillis()).thenReturn(1500L); + scopeFilterConfigs = new ScopeFilterConfigs(ConfigFactory.empty()); + queryExecutor = + QueryExecutorServiceFactory.buildExecutorService( + QueryExecutorConfig.from(this.getConfig())); + } + + @AfterEach + public void clear() { + queryExecutor.shutdown(); + } + + private void mockEntityIdColumnConfigs() { + String entityIdColumnConfigStr = + "entity.idcolumn.config = [\n" + + " {\n" + + " scope = SERVICE\n" + + " key = id\n" + + " },\n" + + " {\n" + + " scope = BACKEND\n" + + " key = id\n" + + " }\n" + + "]"; + Config config = ConfigFactory.parseString(entityIdColumnConfigStr); + entityIdColumnsConfigs = EntityIdColumnsConfigs.fromConfig(config); + } + + private void mock(AttributeMetadataProvider attributeMetadataProvider) { + // interaction related attributes + when(attributeMetadataProvider.getAttributesMetadata( + any(RequestContext.class), eq(AttributeScope.INTERACTION.name()))) + .thenReturn( + Map.of( + "INTERACTION.startTime", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("startTime") + .setFqn("Interaction.start_time_millis") + .setValueKind(AttributeKind.TYPE_INT64) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.startTime") + .build(), + "INTERACTION.fromEntityType", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("fromEntityType") + .setFqn("Interaction.attributes.from_entity_type") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.fromEntityType") + .build(), + "INTERACTION.toEntityType", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("toEntityType") + .setFqn("Interaction.attributes.to_entity_type") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.toEntityType") + .build(), + "INTERACTION.numCalls", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("numCalls") + .setFqn("Interaction.metrics.num_calls") + .setValueKind(AttributeKind.TYPE_INT64) + .setType(AttributeType.METRIC) + .addSources(AttributeSource.QS) + .setId("INTERACTION.numCalls") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.INTERACTION.name()), eq("startTime"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("startTime") + .setFqn("Interaction.start_time_millis") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.startTime") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.INTERACTION.name()), eq("fromEntityType"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("fromEntityType") + .setFqn("Interaction.attributes.from_entity_type") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.fromEntityType") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.INTERACTION.name()), eq("numCalls"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("numCalls") + .setFqn("Interaction.attributes.to_entity_type") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.numCalls") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.INTERACTION.name()), eq("startTime"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.INTERACTION.name()) + .setKey("startTime") + .setFqn("Interaction.start_time_millis") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("INTERACTION.startTime") + .build())); + + // service scope related attributes + when(attributeMetadataProvider.getAttributesMetadata( + any(RequestContext.class), eq(AttributeScope.SERVICE.name()))) + .thenReturn( + Map.of( + "SERVICE.startTime", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.SERVICE.name()) + .setKey("startTime") + .setFqn("SERVICE.startTime") + .setValueKind(AttributeKind.TYPE_INT64) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("SERVICE.startTime") + .build(), + "SERVICE.id", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.SERVICE.name()) + .setKey("id") + .setFqn("SERVICE.id") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .addSources(AttributeSource.EDS) + .setId("SERVICE.id") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.SERVICE.name()), eq("startTime"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.SERVICE.name()) + .setKey("startTime") + .setFqn("SERVICE.start_time_millis") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("SERVICE.startTime") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.SERVICE.name()), eq("id"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.SERVICE.name()) + .setKey("id") + .setFqn("SERVICE.id") + .setValueKind(AttributeKind.TYPE_STRING) + .setId("SERVICE.id") + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .build())); + + // backend related attributes + when(attributeMetadataProvider.getAttributesMetadata( + any(RequestContext.class), eq(AttributeScope.BACKEND.name()))) + .thenReturn( + Map.of( + "BACKEND.startTime", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.BACKEND.name()) + .setKey("startTime") + .setFqn("BACKEND.startTime") + .setValueKind(AttributeKind.TYPE_INT64) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("BACKEND.startTime") + .build(), + "BACKEND.id", + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.BACKEND.name()) + .setKey("id") + .setFqn("BACKEND.id") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .addSources(AttributeSource.EDS) + .setId("BACKEND.id") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.BACKEND.name()), eq("startTime"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.BACKEND.name()) + .setKey("startTime") + .setFqn("BACKEND.start_time_millis") + .setValueKind(AttributeKind.TYPE_STRING) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .setId("BACKEND.startTime") + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.BACKEND.name()), eq("id"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.BACKEND.name()) + .setKey("id") + .setFqn("BACKEND.id") + .setValueKind(AttributeKind.TYPE_STRING) + .setId("BACKEND.id") + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .build())); + + when(attributeMetadataProvider.getAttributeMetadata( + any(RequestContext.class), eq(AttributeScope.EVENT.name()), eq("spaceIds"))) + .thenReturn( + Optional.of( + AttributeMetadata.newBuilder() + .setScopeString(AttributeScope.EVENT.name()) + .setId("EVENT.spaceIds") + .setKey("spaceIds") + .setFqn("EVENT.spaceIds") + .setValueKind(AttributeKind.TYPE_STRING_ARRAY) + .setType(AttributeType.ATTRIBUTE) + .addSources(AttributeSource.QS) + .build())); + } + + private void mockQueryServiceRequestForServiceCount(long startTime, long endTime) { + + org.hypertrace.core.query.service.api.Filter queryServiceFilter = + createQsDefaultRequestFilter("SERVICE.startTime", "SERVICE.id", startTime, endTime); + + QueryRequest expectedQueryRequest = + QueryRequest.newBuilder() + .addSelection(createAttributeExpression("SERVICE.id")) + .addSelection(createQsAggregationExpression("COUNT", "SERVICE.id")) + .setFilter(queryServiceFilter) + .addGroupBy(createAttributeExpression("SERVICE.id")) + .setLimit(QueryServiceClient.DEFAULT_QUERY_SERVICE_GROUP_BY_LIMIT) + .build(); + + when(queryServiceClient.executeQuery(eq(expectedQueryRequest), any(), Mockito.anyInt())) + .thenReturn( + List.of( + ResultSetChunk.newBuilder() + .setResultSetMetadata( + generateResultSetMetadataFor("SERVICE.id", "COUNT_service")) + .addRow(generateRowFor("test_service_1", "10.0")) + .build()) + .iterator()); + } + + private void mockQueryServiceRequestForIncomingServiceInteraction(long startTime, long endTime) { + + org.hypertrace.core.query.service.api.Filter timesFilter = + QueryRequestUtil.createBetweenTimesFilter("INTERACTION.startTime", startTime, endTime); + + org.hypertrace.core.query.service.api.Filter toServiceIdFilter = + QueryRequestUtil.createFilter( + "INTERACTION.toServiceId", + org.hypertrace.core.query.service.api.Operator.IN, + createStringArrayLiteralExpression(List.of("test_service_1"))); + + org.hypertrace.core.query.service.api.Filter fromServiceIdFilter = + QueryRequestUtil.createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of( + createFilter( + "INTERACTION.fromServiceId", + org.hypertrace.core.query.service.api.Operator.NEQ, + createStringNullLiteralExpression()))); + + org.hypertrace.core.query.service.api.Filter interactionQueryFilter = + createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of(timesFilter, toServiceIdFilter, fromServiceIdFilter)); + + QueryRequest expectedQueryRequest = + QueryRequest.newBuilder() + .addSelection(createAttributeExpression("INTERACTION.toServiceId")) + .addSelection(createAttributeExpression("INTERACTION.fromServiceId")) + .addSelection( + createQsAggregationExpression("SUM", "INTERACTION.numCalls", "SUM_num_calls")) + .setFilter(interactionQueryFilter) + .addGroupBy(createAttributeExpression("INTERACTION.toServiceId")) + .addGroupBy(createAttributeExpression("INTERACTION.fromServiceId")) + .setLimit(QueryServiceClient.DEFAULT_QUERY_SERVICE_GROUP_BY_LIMIT) + .build(); + + when(queryServiceClient.executeQuery(eq(expectedQueryRequest), any(), Mockito.anyInt())) + .thenReturn( + List.of( + ResultSetChunk.newBuilder() + .setResultSetMetadata( + generateResultSetMetadataFor( + "INTERACTION.toServiceId", + "INTERACTION.fromServiceId", + "SUM_num_calls")) + .addRow(generateRowFor("test_service_1", "from_test_service_1", "40.0")) + .build()) + .iterator()); + } + + private void mockQueryServiceRequestForOutgoingServiceInteraction(long startTime, long endTime) { + + org.hypertrace.core.query.service.api.Filter timesFilter = + QueryRequestUtil.createBetweenTimesFilter("INTERACTION.startTime", startTime, endTime); + + org.hypertrace.core.query.service.api.Filter fromServiceIdFilter = + QueryRequestUtil.createFilter( + "INTERACTION.fromServiceId", + org.hypertrace.core.query.service.api.Operator.IN, + createStringArrayLiteralExpression(List.of("test_service_1"))); + + org.hypertrace.core.query.service.api.Filter toServiceIdFilter = + QueryRequestUtil.createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of( + createFilter( + "INTERACTION.toServiceId", + org.hypertrace.core.query.service.api.Operator.NEQ, + createStringNullLiteralExpression()))); + + org.hypertrace.core.query.service.api.Filter interactionQueryFilter = + createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of(timesFilter, fromServiceIdFilter, toServiceIdFilter)); + + QueryRequest expectedQueryRequest = + QueryRequest.newBuilder() + .addSelection(createAttributeExpression("INTERACTION.fromServiceId")) + .addSelection(createAttributeExpression("INTERACTION.toServiceId")) + .addSelection( + createQsAggregationExpression("SUM", "INTERACTION.numCalls", "SUM_num_calls")) + .setFilter(interactionQueryFilter) + .addGroupBy(createAttributeExpression("INTERACTION.fromServiceId")) + .addGroupBy(createAttributeExpression("INTERACTION.toServiceId")) + .setLimit(QueryServiceClient.DEFAULT_QUERY_SERVICE_GROUP_BY_LIMIT) + .build(); + + when(queryServiceClient.executeQuery(eq(expectedQueryRequest), any(), Mockito.anyInt())) + .thenReturn( + List.of( + ResultSetChunk.newBuilder() + .setResultSetMetadata( + generateResultSetMetadataFor( + "INTERACTION.fromServiceId", + "INTERACTION.toServiceId", + "SUM_num_calls")) + .addRow(generateRowFor("test_service_1", "to_test_service_1", "20.0")) + .build()) + .iterator()); + } + + private void mockQueryServiceRequestForOutgoingBackendInteraction(long startTime, long endTime) { + + org.hypertrace.core.query.service.api.Filter timesFilter = + QueryRequestUtil.createBetweenTimesFilter("INTERACTION.startTime", startTime, endTime); + + org.hypertrace.core.query.service.api.Filter fromServiceIdFilter = + QueryRequestUtil.createFilter( + "INTERACTION.fromServiceId", + org.hypertrace.core.query.service.api.Operator.IN, + createStringArrayLiteralExpression(List.of("test_service_1"))); + + org.hypertrace.core.query.service.api.Filter toServiceIdFilter = + QueryRequestUtil.createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of( + createFilter( + "INTERACTION.toBackendId", + org.hypertrace.core.query.service.api.Operator.NEQ, + createStringNullLiteralExpression()))); + + org.hypertrace.core.query.service.api.Filter interactionQueryFilter = + createCompositeFilter( + org.hypertrace.core.query.service.api.Operator.AND, + List.of(timesFilter, fromServiceIdFilter, toServiceIdFilter)); + + QueryRequest expectedQueryRequest = + QueryRequest.newBuilder() + .addSelection(createAttributeExpression("INTERACTION.fromServiceId")) + .addSelection(createAttributeExpression("INTERACTION.toBackendId")) + .addSelection( + createQsAggregationExpression("SUM", "INTERACTION.numCalls", "SUM_num_calls")) + .setFilter(interactionQueryFilter) + .addGroupBy(createAttributeExpression("INTERACTION.fromServiceId")) + .addGroupBy(createAttributeExpression("INTERACTION.toBackendId")) + .setLimit(QueryServiceClient.DEFAULT_QUERY_SERVICE_GROUP_BY_LIMIT) + .build(); + + when(queryServiceClient.executeQuery(eq(expectedQueryRequest), any(), Mockito.anyInt())) + .thenReturn( + List.of( + ResultSetChunk.newBuilder() + .setResultSetMetadata( + generateResultSetMetadataFor( + "INTERACTION.fromServiceId", + "INTERACTION.toBackendId", + "SUM_num_calls")) + .addRow( + generateRowFor("test_service_1", "to_backend_test_service_1", "30.0")) + .build()) + .iterator()); + } + + private ResultSetMetadata generateResultSetMetadataFor(String... columnNames) { + ResultSetMetadata.Builder builder = ResultSetMetadata.newBuilder(); + Arrays.stream(columnNames) + .forEach( + columnName -> + builder.addColumnMetadata( + ColumnMetadata.newBuilder() + .setColumnName(columnName) + .setValueType(org.hypertrace.core.query.service.api.ValueType.STRING) + .build())); + return builder.build(); + } + + private Row generateRowFor(String... columnValues) { + Row.Builder rowBuilder = Row.newBuilder(); + Arrays.stream(columnValues) + .forEach( + columnValue -> + rowBuilder.addColumn( + org.hypertrace.core.query.service.api.Value.newBuilder() + .setValueType(org.hypertrace.core.query.service.api.ValueType.STRING) + .setString(columnValue))); + return rowBuilder.build(); + } + + private InteractionsRequest buildIncomingInteractionRequest() { + Set entityTypes = ImmutableSet.of("SERVICE"); + + Filter.Builder entityTypeFilter = + Filter.newBuilder() + .setLhs(buildAttributeExpression("INTERACTION.fromEntityType")) + .setOperator(Operator.IN) + .setRhs( + Expression.newBuilder() + .setLiteral( + LiteralConstant.newBuilder() + .setValue( + Value.newBuilder() + .setValueType(ValueType.STRING_ARRAY) + .addAllStringArray(entityTypes)))); + InteractionsRequest fromInteraction = + InteractionsRequest.newBuilder() + .setFilter(entityTypeFilter) + .addSelection(buildAttributeExpression("INTERACTION.fromEntityType", "fromEntityType")) + .addSelection(buildAttributeExpression("INTERACTION.fromEntityId", "fromEntityId")) + .addSelection( + getAggregateFunctionExpression( + "INTERACTION.numCalls", FunctionType.SUM, "SUM_num_calls")) + .build(); + + return fromInteraction; + } + + private InteractionsRequest buildOutgoingInteractionRequest() { + Set entityTypes = ImmutableSet.of("SERVICE", "BACKEND"); + + Filter.Builder entityTypeFilter = + Filter.newBuilder() + .setLhs(buildAttributeExpression("INTERACTION.toEntityType")) + .setOperator(Operator.IN) + .setRhs( + Expression.newBuilder() + .setLiteral( + LiteralConstant.newBuilder() + .setValue( + Value.newBuilder() + .setValueType(ValueType.STRING_ARRAY) + .addAllStringArray(entityTypes)))); + InteractionsRequest toInteraction = + InteractionsRequest.newBuilder() + .setFilter(entityTypeFilter) + .addSelection(buildAttributeExpression("INTERACTION.toEntityType", "toEntityType")) + .addSelection(buildAttributeExpression("INTERACTION.toEntityId", "toEntityId")) + .addSelection( + getAggregateFunctionExpression( + "INTERACTION.numCalls", FunctionType.SUM, "SUM_num_calls")) + .build(); + + return toInteraction; + } + + @Test + public void testGetEntitiesForMultipleTypeInteractionQuery() { + long endTime = System.currentTimeMillis(); + long startTime = endTime - TimeUnit.DAYS.toMillis(30); + + EntitiesRequest request = + EntitiesRequest.newBuilder() + .setEntityType(DomainEntityType.SERVICE.name()) + .setStartTimeMillis(startTime) + .setEndTimeMillis(endTime) + .addSelection(buildAttributeExpression("SERVICE.id")) + .setIncomingInteractions(buildIncomingInteractionRequest()) + .setOutgoingInteractions(buildOutgoingInteractionRequest()) + .build(); + + mockQueryServiceRequestForServiceCount(startTime, endTime); + mockQueryServiceRequestForIncomingServiceInteraction(startTime, endTime); + mockQueryServiceRequestForOutgoingServiceInteraction(startTime, endTime); + mockQueryServiceRequestForOutgoingBackendInteraction(startTime, endTime); + + EntityService entityService = + new EntityService( + queryServiceClient, + 500, + entityQueryServiceClient, + attributeMetadataProvider, + entityIdColumnsConfigs, + scopeFilterConfigs, + logConfig, + queryExecutor); + EntitiesResponse response = entityService.getEntities(TENANT_ID, request, Map.of()); + + // validate we have one incoming edge, and two outgoing edge + assertNotNull(response); + assertEquals(1, response.getTotal()); + assertEquals(1, response.getEntity(0).getIncomingInteractionCount()); + assertEquals(2, response.getEntity(0).getOutgoingInteractionCount()); + + // validate incoming edge + EntityInteraction incomingInteraction = response.getEntity(0).getIncomingInteraction(0); + assertEquals( + "from_test_service_1", + incomingInteraction.getAttributeMap().get("fromEntityId").getString()); + assertEquals( + "SERVICE", incomingInteraction.getAttributeMap().get("fromEntityType").getString()); + assertEquals(40, incomingInteraction.getMetricsMap().get("SUM_num_calls").getValue().getLong()); + + // validate outgoing service edge + EntityInteraction outGoingServiceInteraction = + response.getEntity(0).getOutgoingInteractionList().stream() + .filter( + i -> + i.getAttributeMap().containsKey("toEntityType") + && i.getAttributeMap().get("toEntityType").getString().equals("SERVICE")) + .findFirst() + .orElse(null); + assertNotNull(outGoingServiceInteraction); + assertEquals( + "to_test_service_1", + outGoingServiceInteraction.getAttributeMap().get("toEntityId").getString()); + assertEquals( + 20, outGoingServiceInteraction.getMetricsMap().get("SUM_num_calls").getValue().getLong()); + + // validate outgoing backend edge + EntityInteraction outGoingBackendInteraction = + response.getEntity(0).getOutgoingInteractionList().stream() + .filter( + i -> + i.getAttributeMap().containsKey("toEntityType") + && i.getAttributeMap().get("toEntityType").getString().equals("BACKEND")) + .findFirst() + .orElse(null); + assertNotNull(outGoingBackendInteraction); + assertEquals( + "to_backend_test_service_1", + outGoingBackendInteraction.getAttributeMap().get("toEntityId").getString()); + assertEquals( + 30, outGoingBackendInteraction.getMetricsMap().get("SUM_num_calls").getValue().getLong()); + } +} diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceTest.java index 0ab189d8..aa4228b0 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/entity/EntityServiceTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import org.hypertrace.core.attribute.service.v1.AttributeKind; import org.hypertrace.core.attribute.service.v1.AttributeMetadata; import org.hypertrace.core.attribute.service.v1.AttributeScope; @@ -48,6 +49,8 @@ import org.hypertrace.gateway.service.common.config.ScopeFilterConfigs; import org.hypertrace.gateway.service.entity.config.EntityIdColumnsConfigs; import org.hypertrace.gateway.service.entity.config.LogConfig; +import org.hypertrace.gateway.service.executor.QueryExecutorConfig; +import org.hypertrace.gateway.service.executor.QueryExecutorServiceFactory; import org.hypertrace.gateway.service.v1.common.Expression; import org.hypertrace.gateway.service.v1.common.FunctionType; import org.hypertrace.gateway.service.v1.common.LiteralConstant; @@ -57,6 +60,7 @@ import org.hypertrace.gateway.service.v1.entity.EntitiesRequest; import org.hypertrace.gateway.service.v1.entity.EntitiesResponse; import org.hypertrace.gateway.service.v1.entity.Entity; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,6 +73,7 @@ public class EntityServiceTest extends AbstractGatewayServiceTest { private AttributeMetadataProvider attributeMetadataProvider; private EntityIdColumnsConfigs entityIdColumnsConfigs; private LogConfig logConfig; + private ExecutorService queryExecutor; @BeforeEach public void setup() { @@ -80,6 +85,14 @@ public void setup() { mock(attributeMetadataProvider); logConfig = Mockito.mock(LogConfig.class); when(logConfig.getQueryThresholdInMillis()).thenReturn(1500L); + queryExecutor = + QueryExecutorServiceFactory.buildExecutorService( + QueryExecutorConfig.from(this.getConfig())); + } + + @AfterEach + public void clear() { + queryExecutor.shutdown(); } private void mockEntityIdColumnConfigs() { @@ -278,7 +291,8 @@ public void testGetEntitiesOnlySelectFromSingleSourceWithTimeRangeShouldUseQuery attributeMetadataProvider, entityIdColumnsConfigs, scopeFilterConfigs, - logConfig); + logConfig, + queryExecutor); EntitiesResponse response = entityService.getEntities(TENANT_ID, entitiesRequest, Map.of()); Assertions.assertNotNull(response); Assertions.assertEquals(2, response.getTotal()); @@ -312,7 +326,8 @@ public void testGetEntitiesOnlySelectFromMultipleSources() { attributeMetadataProvider, entityIdColumnsConfigs, scopeFilterConfigs, - logConfig); + logConfig, + queryExecutor); EntitiesRequest entitiesRequest = EntitiesRequest.newBuilder() .setEntityType("API")