Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

Commit

Permalink
refactor(entity-fetcher): get rid of aggregated metrics query (#66)
Browse files Browse the repository at this point in the history
* chore: using data fetcher node instead of selection and filter node

* added unit tests

* added debug logs and comments

* optimize filter and order by single source

* remove redundant method

* remove redundant method

* fix optimization for QS

* remove redundant method

* remove redundant utils method

* revert back entity id equals filter

* remove entity id equals filter

* remove redundant code

* added entity response

* added unit tests

* data fetcher node is paginated for non null limit and offset

* got rid of the total entities request

* set equals in unit test method

* removed redundant total entities test

* refactor(entity-fetcher): get rid of aggregated metrics query

* added unit tests
  • Loading branch information
skjindal93 authored Jan 17, 2021
1 parent e486ef8 commit d50c174
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public void mapAliasToFunctionExpression(String alias, FunctionExpression functi
aliasToFunctionExpressionMap.put(alias, functionExpression);
}

public boolean containsFunctionExpression(String alias) {
return aliasToFunctionExpressionMap.containsKey(alias);
}

public FunctionExpression getFunctionExpressionByAlias(String alias) {
return aliasToFunctionExpressionMap.get(alias);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ public EntityFetcherResponse getEntities(
return new EntityFetcherResponse(entityBuilders);
}

@Override
public EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
throw new UnsupportedOperationException("Fetching aggregated metrics not supported by EDS");
}

@Override
public EntityFetcherResponse getTimeAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ public interface IEntityFetcher {
EntityFetcherResponse getEntities(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest);

/**
* Get aggregated metrics
*
* @param requestContext Additional context for the incoming request
* @param entitiesRequest encapsulates the aggregated metrics query (selection, filter, order)
* @return Map of the Entity Builders keyed by the EntityId
*/
EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest);

/**
* Get time series data
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ public EntityFetcherResponse getEntities(
i++) {
ColumnMetadata metadata = chunk.getResultSetMetadata().getColumnMetadata(i);
org.hypertrace.core.query.service.api.Value columnValue = row.getColumn(i);
addEntityAttribute(entityBuilder,
buildEntity(
entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap,
Expand All @@ -169,97 +172,6 @@ public EntityFetcherResponse getEntities(
return new EntityFetcherResponse(entityBuilders);
}

@Override
public EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
// Only supported filter is entityIds IN ["id1", "id2", "id3"]
Map<String, AttributeMetadata> attributeMetadataMap =
attributeMetadataProvider.getAttributesMetadata(
requestContext, entitiesRequest.getEntityType());
entitiesRequestValidator.validate(entitiesRequest, attributeMetadataMap);

List<org.hypertrace.gateway.service.v1.common.Expression> aggregates =
ExpressionReader.getFunctionExpressions(entitiesRequest.getSelectionList().stream());
if (aggregates.isEmpty()) {
return new EntityFetcherResponse();
}

List<String> entityIdAttributes =
AttributeMetadataUtil.getIdAttributeIds(
attributeMetadataProvider, entityIdColumnsConfigs, requestContext, entitiesRequest.getEntityType());

QueryRequest.Builder builder =
constructSelectionQuery(requestContext, entitiesRequest, entityIdAttributes, aggregates);
adjustLimitAndOffset(builder, entitiesRequest.getLimit(), entitiesRequest.getOffset());

QueryRequest request = builder.build();

if (LOG.isDebugEnabled()) {
LOG.debug("Sending Aggregated Metrics Request to Query Service ======== \n {}", request);
}

Iterator<ResultSetChunk> resultSetChunkIterator =
queryServiceClient.executeQuery(request, requestContext.getHeaders(),
requestTimeout);

// We want to retain the order as returned from the respective source. Hence using a
// LinkedHashMap
Map<EntityKey, Builder> entityMap = new LinkedHashMap<>();

while (resultSetChunkIterator.hasNext()) {
ResultSetChunk chunk = resultSetChunkIterator.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Received chunk: " + chunk.toString());
}

if (chunk.getRowCount() < 1) {
break;
}

if (!chunk.hasResultSetMetadata()) {
LOG.warn("Chunk doesn't have result metadata so couldn't process the response.");
break;
}

for (Row row : chunk.getRowList()) {
// Construct the EntityKey from the EntityId attributes columns
EntityKey entityKey =
EntityKey.of(
IntStream.range(0, entityIdAttributes.size())
.mapToObj(value -> row.getColumn(value).getString())
.toArray(String[]::new));
Builder entityBuilder = entityMap.computeIfAbsent(entityKey, k -> Entity.newBuilder());
entityBuilder.setEntityType(entitiesRequest.getEntityType());
entityBuilder.setId(entityKey.toString());
// Always include the id in entity since that's needed to make follow up queries in
// optimal fashion. If this wasn't really requested by the client, it should be removed
// as post processing.
for (int i = 0; i < entityIdAttributes.size(); i++) {
entityBuilder.putAttribute(
entityIdAttributes.get(i),
Value.newBuilder()
.setString(entityKey.getAttributes().get(i))
.setValueType(ValueType.STRING)
.build());
}

for (int i = entityIdAttributes.size();
i < chunk.getResultSetMetadata().getColumnMetadataCount();
i++) {
ColumnMetadata metadata = chunk.getResultSetMetadata().getColumnMetadata(i);
org.hypertrace.core.query.service.api.Value columnValue = row.getColumn(i);
addAggregateMetric(entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap);
}
}
}
return new EntityFetcherResponse(entityMap);
}

private void adjustLimitAndOffset(QueryRequest.Builder builder, int limit, int offset) {
// If there is more than one groupBy column, we cannot set the same limit that came
// in the request since that might return less entities than needed when the same
Expand Down Expand Up @@ -336,28 +248,51 @@ private QueryRequest.Builder constructSelectionQuery(EntitiesRequestContext requ
return builder;
}

private void addEntityAttribute(Entity.Builder entityBuilder,
private void buildEntity(
Entity.Builder entityBuilder,
QueryRequestContext requestContext,
EntitiesRequest entitiesRequest,
ColumnMetadata metadata,
org.hypertrace.core.query.service.api.Value columnValue,
Map<String, AttributeMetadata> attributeMetadataMap,
boolean isSkipCountColumn) {

// Ignore the count column since we introduced that ourselves into the query
if (isSkipCountColumn &&
StringUtils.equalsIgnoreCase(COUNT_COLUMN_NAME, metadata.getColumnName())) {
if (isSkipCountColumn
&& StringUtils.equalsIgnoreCase(COUNT_COLUMN_NAME, metadata.getColumnName())) {
return;
}

// aggregate
if (requestContext.containsFunctionExpression(metadata.getColumnName())) {
addAggregateMetric(
entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap);
} else {
// attribute
addEntityAttribute(entityBuilder, metadata, columnValue, attributeMetadataMap);
}
}

private void addEntityAttribute(
Entity.Builder entityBuilder,
ColumnMetadata metadata,
org.hypertrace.core.query.service.api.Value columnValue,
Map<String, AttributeMetadata> attributeMetadataMap) {

String attributeName = metadata.getColumnName();
entityBuilder.putAttribute(
attributeName,
QueryAndGatewayDtoConverter.convertToGatewayValue(
attributeName,
columnValue,
attributeMetadataMap));
attributeName, columnValue, attributeMetadataMap));
}

private void addAggregateMetric(Entity.Builder entityBuilder,
private void addAggregateMetric(
Entity.Builder entityBuilder,
QueryRequestContext requestContext,
EntitiesRequest entitiesRequest,
ColumnMetadata metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -148,6 +146,7 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) {
EntitiesRequest.Builder requestBuilder =
EntitiesRequest.newBuilder(entitiesRequest)
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
.clearLimit()
Expand Down Expand Up @@ -243,6 +242,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
// TODO: Should we push order by, limit and offet down to the data source?
// If we want to push the order by down, we would also have to divide order by into
Expand Down Expand Up @@ -273,6 +273,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
.clearOffset()
Expand All @@ -290,7 +291,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
request.getEntityType(),
executionContext.getTimestampAttributeId(),
executionContext.getRequestHeaders());
return entityFetcher.getAggregatedMetrics(context, request);
return entityFetcher.getEntities(context, request);
})
.collect(Collectors.toList()));
resultMapList.addAll(
Expand All @@ -299,6 +300,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
source -> {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ public static AggregatedMetricValue getAggregatedMetricValue(FunctionType functi
.build();
}

public static AggregatedMetricValue getAggregatedMetricValue(FunctionType functionType, long value) {
return AggregatedMetricValue.newBuilder()
.setFunction(functionType)
.setValue(Value.newBuilder().setLong(value).setValueType(ValueType.LONG))
.build();
}

public static Expression getLiteralExpression(long value) {
return Expression.newBuilder()
.setLiteral(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,6 @@ public void test_getEntities_WithoutPagination() {
entityDataServiceEntityFetcher.getEntities(entitiesRequestContext, entitiesRequest).size());
}

@Test
public void test_getAggregatedMetrics() {
assertThrows(
UnsupportedOperationException.class,
() -> {
entityDataServiceEntityFetcher.getAggregatedMetrics(
new EntitiesRequestContext(TENANT_ID, 0, 1, "API", "API.startTime", Map.of()),
EntitiesRequest.newBuilder().build());
});
}

@Test
public void test_getTimeAggregatedMetrics() {
assertThrows(
Expand Down
Loading

0 comments on commit d50c174

Please sign in to comment.