Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

Commit

Permalink
perf(entities): fetch total directly (#77)
Browse files Browse the repository at this point in the history
* chore(entities-api): flag to fetch total entitiess

* snake case for proto fields

* perf(entities): fetch total entities, only if requested

* perf(entities): fetch total direcctly

* added unit tests

* upgraded entity service dependencies

* added unit tests
  • Loading branch information
skjindal93 authored Jan 28, 2021
1 parent 77eebec commit 46d7899
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 151 deletions.
4 changes: 2 additions & 2 deletions gateway-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ dependencies {

implementation("org.hypertrace.core.query.service:query-service-client:0.5.2")
implementation("org.hypertrace.core.attribute.service:attribute-service-client:0.9.3")
implementation("org.hypertrace.entity.service:entity-service-client:0.5.0")
implementation("org.hypertrace.entity.service:entity-service-api:0.5.0")
implementation("org.hypertrace.entity.service:entity-service-client:0.5.6")
implementation("org.hypertrace.entity.service:entity-service-api:0.5.6")
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.2")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.19")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class QueryRequestUtil {

public static final String DATE_TIME_CONVERTER = "dateTimeConvert";

private static final String COUNT_FUNCTION_NAME = "COUNT";
private static final String DISTINCTCOUNT_FUNCTION_NAME = "DISTINCTCOUNT";

public static Filter createBetweenTimesFilter(String columnName, long lower, long higher) {
return Filter.newBuilder()
.setOperator(Operator.AND)
Expand Down Expand Up @@ -102,7 +105,16 @@ public static Expression createCountByColumnSelection(String columnName) {
return Expression.newBuilder()
.setFunction(
Function.newBuilder()
.setFunctionName("COUNT")
.setFunctionName(COUNT_FUNCTION_NAME)
.addArguments(createColumnExpression(columnName)))
.build();
}

public static Expression createDistinctCountByColumnSelection(String columnName) {
return Expression.newBuilder()
.setFunction(
Function.newBuilder()
.setFunctionName(DISTINCTCOUNT_FUNCTION_NAME)
.addArguments(createColumnExpression(columnName)))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.hypertrace.entity.query.service.v1.EntityQueryRequest;
import org.hypertrace.entity.query.service.v1.ResultSetChunk;
import org.hypertrace.entity.query.service.v1.Row;
import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest;
import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse;
import org.hypertrace.gateway.service.common.AttributeMetadataProvider;
import org.hypertrace.gateway.service.common.converters.EntityServiceAndGatewayServiceConverter;
import org.hypertrace.gateway.service.common.util.AttributeMetadataUtil;
Expand Down Expand Up @@ -50,7 +52,10 @@ public EntityFetcherResponse getEntities(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
List<String> mappedEntityIdAttributeIds =
AttributeMetadataUtil.getIdAttributeIds(
attributeMetadataProvider, entityIdColumnsConfigs, requestContext, entitiesRequest.getEntityType());
attributeMetadataProvider,
entityIdColumnsConfigs,
requestContext,
entitiesRequest.getEntityType());
EntityQueryRequest.Builder builder =
EntityQueryRequest.newBuilder()
.setEntityType(entitiesRequest.getEntityType())
Expand Down Expand Up @@ -174,4 +179,35 @@ public EntityFetcherResponse getTimeAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
throw new UnsupportedOperationException("Fetching time series data not supported by EDS");
}

@Override
public long getTotal(EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
EntityQueryRequest.Builder builder =
EntityQueryRequest.newBuilder()
.setEntityType(entitiesRequest.getEntityType())
.setFilter(
EntityServiceAndGatewayServiceConverter.convertToEntityServiceFilter(
entitiesRequest.getFilter()));

// add time filter for supported scope
EntityServiceAndGatewayServiceConverter.addBetweenTimeFilter(
entitiesRequest.getStartTimeMillis(),
entitiesRequest.getEndTimeMillis(),
attributeMetadataProvider,
entitiesRequest,
builder,
requestContext);

EntityQueryRequest entityQueryRequest = builder.build();

TotalEntitiesRequest totalEntitiesRequest =
TotalEntitiesRequest.newBuilder()
.setEntityType(entitiesRequest.getEntityType())
.setFilter(entityQueryRequest.getFilter())
.build();

TotalEntitiesResponse response =
entityQueryServiceClient.total(totalEntitiesRequest, requestContext.getHeaders());
return response.getTotal();
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package org.hypertrace.gateway.service.common.datafetcher;

import org.hypertrace.gateway.service.entity.EntityKey;

import java.util.HashSet;
import java.util.Set;

public class EntityResponse {
private final EntityFetcherResponse entityFetcherResponse;
// set of entity keys, irrespective of offset and limit
private final Set<EntityKey> entityKeys;
private final long total;

public EntityResponse() {
this.entityFetcherResponse = new EntityFetcherResponse();
this.entityKeys = new HashSet<>();
this.total = 0;
}

public EntityResponse(EntityFetcherResponse entityFetcherResponse, Set<EntityKey> entityKeys) {
public EntityResponse(EntityFetcherResponse entityFetcherResponse, long total) {
this.entityFetcherResponse = entityFetcherResponse;
this.entityKeys = entityKeys;
this.total = total;
}

public EntityFetcherResponse getEntityFetcherResponse() {
return entityFetcherResponse;
}

public Set<EntityKey> getEntityKeys() {
return entityKeys;
public long getTotal() {
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;

import org.hypertrace.gateway.service.entity.EntitiesRequestContext;
import org.hypertrace.gateway.service.entity.EntityKey;
import org.hypertrace.gateway.service.v1.common.Filter;
import org.hypertrace.gateway.service.v1.common.Interval;
import org.hypertrace.gateway.service.v1.common.MetricSeries;
import org.hypertrace.gateway.service.v1.entity.EntitiesRequest;
Expand Down Expand Up @@ -46,4 +46,6 @@ default MetricSeries getSortedMetricSeries(MetricSeries.Builder builder) {
.addAllValue(sortedIntervals)
.build();
}

long getTotal(EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.hypertrace.gateway.service.common.converters.QueryAndGatewayDtoConverter.convertToQueryExpression;
import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createCountByColumnSelection;
import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createDistinctCountByColumnSelection;
import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createFilter;
import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createStringNullLiteralExpression;
import static org.hypertrace.gateway.service.common.converters.QueryRequestUtil.createTimeColumnGroupByExpression;
Expand Down Expand Up @@ -488,6 +489,66 @@ public EntityFetcherResponse getTimeAggregatedMetrics(
return new EntityFetcherResponse(resultMap);
}

@Override
public long getTotal(EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
Map<String, AttributeMetadata> attributeMetadataMap =
attributeMetadataProvider.getAttributesMetadata(
requestContext, entitiesRequest.getEntityType());
// Validate EntitiesRequest
entitiesRequestValidator.validate(entitiesRequest, attributeMetadataMap);

List<String> entityIdAttributes =
AttributeMetadataUtil.getIdAttributeIds(
attributeMetadataProvider,
entityIdColumnsConfigs,
requestContext,
entitiesRequest.getEntityType());

Filter.Builder filterBuilder =
constructQueryServiceFilter(entitiesRequest, requestContext, entityIdAttributes);

QueryRequest queryRequest =
QueryRequest.newBuilder()
.addSelection(
createDistinctCountByColumnSelection(
Optional.ofNullable(entityIdAttributes.get(0)).orElseThrow()))
.setFilter(filterBuilder)
.build();

if (LOG.isDebugEnabled()) {
LOG.debug("Sending Query to Query Service ======== \n {}", queryRequest);
}

Iterator<ResultSetChunk> resultSetChunkIterator =
queryServiceClient.executeQuery(queryRequest, requestContext.getHeaders(),
requestTimeout);

while (resultSetChunkIterator.hasNext()) {
ResultSetChunk chunk = resultSetChunkIterator.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Received chunk: " + chunk.toString());
}

if (chunk.getRowCount() < 1) {
break;
}

for (Row row : chunk.getRowList()) {
// only DISTINCTCOUNT column is requested as a selection
if (row.getColumnList().size() != 1) {
break;
}

return Long.parseLong(row.getColumn(0).getString());
}
}

LOG.error(
"Unable to query total number of entities from query service for query request: {}",
queryRequest);
return 0;
}

private QueryRequest buildTimeSeriesQueryRequest(
EntitiesRequest entitiesRequest,
EntitiesRequestContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public EntitiesResponse getEntities(
executionTree.acceptVisitor(new ExecutionVisitor(executionContext, EntityQueryHandlerRegistry.get()));

EntityFetcherResponse entityFetcherResponse = response.getEntityFetcherResponse();
Set<EntityKey> allEntityKeys = response.getEntityKeys();

List<Entity.Builder> results =
this.responsePostProcessor.transform(
Expand All @@ -157,7 +156,7 @@ public EntitiesResponse getEntities(
}

EntitiesResponse.Builder responseBuilder =
EntitiesResponse.newBuilder().setTotal(allEntityKeys.size());
EntitiesResponse.newBuilder().setTotal(Long.valueOf(response.getTotal()).intValue());

results.forEach(e -> responseBuilder.addEntity(e.build()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.google.common.collect.Sets;
import org.hypertrace.gateway.service.common.datafetcher.EntityFetcherResponse;
import org.hypertrace.gateway.service.common.datafetcher.EntityResponse;
import org.hypertrace.gateway.service.common.datafetcher.IEntityFetcher;
Expand Down Expand Up @@ -90,13 +89,8 @@ protected static EntityResponse intersect(List<EntityResponse> entityResponses)
entityResponses.parallelStream()
.map(EntityResponse::getEntityFetcherResponse)
.collect(Collectors.toList()));
Set<EntityKey> entityKeys =
entityResponses.parallelStream()
.map(EntityResponse::getEntityKeys)
.reduce(Sets::intersection)
.orElse(Collections.emptySet());

return new EntityResponse(entityFetcherResponse, entityKeys);
return new EntityResponse(entityFetcherResponse, entityFetcherResponse.size());
}

private static EntityFetcherResponse unionEntities(List<EntityFetcherResponse> builders) {
Expand All @@ -121,13 +115,8 @@ protected static EntityResponse union(List<EntityResponse> entityResponses) {
entityResponses.parallelStream()
.map(EntityResponse::getEntityFetcherResponse)
.collect(Collectors.toList()));
Set<EntityKey> entityKeys =
entityResponses.parallelStream()
.map(EntityResponse::getEntityKeys)
.reduce(Sets::union)
.orElse(Collections.emptySet());

return new EntityResponse(entityFetcherResponse, entityKeys);
return new EntityResponse(entityFetcherResponse, entityFetcherResponse.size());
}

@Override
Expand Down Expand Up @@ -175,27 +164,16 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) {
// if the data fetcher node is fetching paginated records and the client has requested for
// total, the total number of entities has to be fetched separately
if (dataFetcherNode.canFetchTotal()) {
EntitiesRequest totalEntitiesRequest =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearOrderBy()
.clearLimit()
.setOffset(0)
.setFilter(dataFetcherNode.getFilter())
.build();

// since, the pagination is pushed down to the data store, total can be requested directly
// from the data store
return new EntityResponse(
entityFetcher.getEntities(context, request),
entityFetcher
.getEntities(context, totalEntitiesRequest)
.getEntityKeyBuilderMap()
.keySet());
entityFetcher.getTotal(context, entitiesRequest));
} else {
// if the data fetcher node is not paginating, the total number of entities is equal to number
// of records fetched
EntityFetcherResponse response = entityFetcher.getEntities(context, request);
return new EntityResponse(response, response.getEntityKeyBuilderMap().keySet());
return new EntityResponse(response, response.getEntityKeyBuilderMap().size());
}
}

Expand Down Expand Up @@ -328,14 +306,14 @@ public EntityResponse visit(SelectionNode selectionNode) {
.reduce(childEntityFetcherResponse, (r1, r2) -> unionEntities(Arrays.asList(r1, r2)));

if (!childEntityFetcherResponse.isEmpty()) {
// if the child fetcher response is non empty, the total set of entity keys
// if the child fetcher response is non empty, the total
// has already been fetched by node below it.
// Could be DataFetcherNode or a child SelectionNode
return new EntityResponse(response, childNodeResponse.getEntityKeys());
return new EntityResponse(response, childNodeResponse.getTotal());
} else {
// if the child fetcher response is empty, the total set of entity keys
// if the child fetcher response is empty, the total
// is equal to the response fetched by the current SelectionNode
return new EntityResponse(response, response.getEntityKeyBuilderMap().keySet());
return new EntityResponse(response, response.size());
}
}

Expand Down Expand Up @@ -413,7 +391,7 @@ public EntityResponse visit(SortAndPaginateNode sortAndPaginateNode) {
Map<EntityKey, Builder> linkedHashMap = new LinkedHashMap<>();
sortedList.forEach(entry -> linkedHashMap.put(entry.getKey(), entry.getValue()));
return new EntityResponse(
new EntityFetcherResponse(linkedHashMap), childNodeResponse.getEntityKeys());
new EntityFetcherResponse(linkedHashMap), childNodeResponse.getTotal());
}

@Override
Expand Down Expand Up @@ -442,6 +420,6 @@ public EntityResponse visit(PaginateOnlyNode paginateOnlyNode) {
sortedList.forEach(entry -> linkedHashMap.put(entry.getKey(), entry.getValue()));

return new EntityResponse(
new EntityFetcherResponse(linkedHashMap), childNodeResponse.getEntityKeys());
new EntityFetcherResponse(linkedHashMap), childNodeResponse.getTotal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static void compareEntityResponses(EntityResponse expectedEntityResponse,
expectedEntityResponse.getEntityFetcherResponse(),
actualEntityResponse.getEntityFetcherResponse());

assertEquals(expectedEntityResponse.getEntityKeys(), actualEntityResponse.getEntityKeys());
assertEquals(expectedEntityResponse.getTotal(), actualEntityResponse.getTotal());
}

public static Filter getTimeRangeFilter(String colName, long startTime, long endTime) {
Expand Down
Loading

0 comments on commit 46d7899

Please sign in to comment.