This repository was archived by the owner on Jun 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Rishabh
authored
Apr 22, 2021
1 parent
27c0e47
commit 9e7f586
Showing
8 changed files
with
434 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
gateway-service-api/src/main/proto/org/hypertrace/gateway/service/v1/log_event.proto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
syntax = "proto3"; | ||
|
||
option java_multiple_files = true; | ||
option java_package = "org.hypertrace.gateway.service.v1.log.events"; | ||
|
||
package org.hypertrace.gateway.service.v1.log.event; | ||
|
||
import "org/hypertrace/gateway/service/v1/gateway_query.proto"; | ||
|
||
message LogEventsRequest { | ||
// todo change this to nano | ||
sfixed64 start_time_millis = 1; | ||
sfixed64 end_time_millis = 2; | ||
|
||
org.hypertrace.gateway.service.v1.common.Filter filter = 3; | ||
repeated org.hypertrace.gateway.service.v1.common.Expression selection = 4; | ||
repeated org.hypertrace.gateway.service.v1.common.OrderByExpression order_by = 5; | ||
int32 offset = 6; | ||
int32 limit = 7; | ||
} | ||
|
||
message LogEvent { | ||
map<string, org.hypertrace.gateway.service.v1.common.Value> attributes = 1; | ||
} | ||
|
||
message LogEventsResponse { | ||
repeated LogEvent log_events = 1; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
...-service-impl/src/main/java/org/hypertrace/gateway/service/logevent/LogEventsService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
package org.hypertrace.gateway.service.logevent; | ||
|
||
import static org.hypertrace.gateway.service.common.util.AttributeMetadataUtil.getTimestampAttributeMetadata; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.micrometer.core.instrument.Timer; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import org.hypertrace.core.attribute.service.v1.AttributeMetadata; | ||
import org.hypertrace.core.query.service.api.ColumnMetadata; | ||
import org.hypertrace.core.query.service.api.QueryRequest; | ||
import org.hypertrace.core.query.service.api.ResultSetChunk; | ||
import org.hypertrace.core.query.service.api.ResultSetMetadata; | ||
import org.hypertrace.core.query.service.api.Row; | ||
import org.hypertrace.core.query.service.client.QueryServiceClient; | ||
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; | ||
import org.hypertrace.gateway.service.common.AttributeMetadataProvider; | ||
import org.hypertrace.gateway.service.common.RequestContext; | ||
import org.hypertrace.gateway.service.common.converters.QueryAndGatewayDtoConverter; | ||
import org.hypertrace.gateway.service.v1.common.OrderByExpression; | ||
import org.hypertrace.gateway.service.v1.log.events.LogEvent; | ||
import org.hypertrace.gateway.service.v1.log.events.LogEventsRequest; | ||
import org.hypertrace.gateway.service.v1.log.events.LogEventsResponse; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class LogEventsService { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(LogEventsService.class); | ||
|
||
private static final String LOG_EVENT_SCOPE = "LOG_EVENT"; | ||
private final QueryServiceClient queryServiceClient; | ||
private final int requestTimeout; | ||
private final AttributeMetadataProvider attributeMetadataProvider; | ||
|
||
private Timer queryExecutionTimer; | ||
|
||
public LogEventsService( | ||
QueryServiceClient queryServiceClient, | ||
int requestTimeout, | ||
AttributeMetadataProvider attributeMetadataProvider) { | ||
this.queryServiceClient = queryServiceClient; | ||
this.requestTimeout = requestTimeout; | ||
this.attributeMetadataProvider = attributeMetadataProvider; | ||
initMetrics(); | ||
} | ||
|
||
private void initMetrics() { | ||
queryExecutionTimer = | ||
PlatformMetricsRegistry.registerTimer( | ||
"hypertrace.log.event.query.execution", ImmutableMap.of()); | ||
} | ||
|
||
public LogEventsResponse getLogEventsByFilter(RequestContext context, LogEventsRequest request) { | ||
Instant start = Instant.now(); | ||
try { | ||
Map<String, AttributeMetadata> attributeMap = | ||
attributeMetadataProvider.getAttributesMetadata(context, LOG_EVENT_SCOPE); | ||
LogEventsResponse.Builder logEventResponseBuilder = LogEventsResponse.newBuilder(); | ||
|
||
List<LogEvent> logEvents = fetchLogEvents(context, request, attributeMap); | ||
|
||
logEventResponseBuilder.addAllLogEvents(logEvents); | ||
|
||
LogEventsResponse response = logEventResponseBuilder.build(); | ||
LOG.debug("Log Event Service Response: {}", response); | ||
return response; | ||
} finally { | ||
queryExecutionTimer.record( | ||
Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
private List<LogEvent> fetchLogEvents( | ||
RequestContext context, | ||
LogEventsRequest request, | ||
Map<String, AttributeMetadata> attributeMetadataMap) { | ||
|
||
AttributeMetadata timestampAttributeMetadata = | ||
getTimestampAttributeMetadata(attributeMetadataProvider, context, LOG_EVENT_SCOPE); | ||
QueryRequest.Builder queryBuilder = | ||
QueryRequest.newBuilder() | ||
.setFilter( | ||
QueryAndGatewayDtoConverter.addTimeAndSpaceFiltersAndConvertToQueryFilter( | ||
convertFromMillis( | ||
request.getStartTimeMillis(), timestampAttributeMetadata.getUnit()), | ||
convertFromMillis( | ||
request.getEndTimeMillis(), timestampAttributeMetadata.getUnit()), | ||
"", | ||
timestampAttributeMetadata.getId(), | ||
"", | ||
request.getFilter())); | ||
|
||
if (!request.getSelectionList().isEmpty()) { | ||
request | ||
.getSelectionList() | ||
.forEach( | ||
exp -> | ||
queryBuilder.addSelection( | ||
QueryAndGatewayDtoConverter.convertToQueryExpression(exp))); | ||
} | ||
|
||
addSortLimitAndOffset(request, queryBuilder); | ||
|
||
List<LogEvent> logEventResult = new ArrayList<>(); | ||
QueryRequest queryRequest = queryBuilder.build(); | ||
|
||
Iterator<ResultSetChunk> resultSetChunkIterator = | ||
queryServiceClient.executeQuery(queryRequest, context.getHeaders(), requestTimeout); | ||
|
||
ResultSetMetadata resultSetMetadata = null; | ||
while (resultSetChunkIterator.hasNext()) { | ||
ResultSetChunk chunk = resultSetChunkIterator.next(); | ||
|
||
LOG.debug("Received chunk: {}", chunk); | ||
|
||
if (null == resultSetMetadata && chunk.hasResultSetMetadata()) { | ||
resultSetMetadata = chunk.getResultSetMetadata(); | ||
} | ||
|
||
for (Row row : chunk.getRowList()) { | ||
LogEvent.Builder logEventBuilder = LogEvent.newBuilder(); | ||
for (int i = 0; i < resultSetMetadata.getColumnMetadataCount(); i++) { | ||
ColumnMetadata metadata = resultSetMetadata.getColumnMetadata(i); | ||
String attrName = metadata.getColumnName(); | ||
logEventBuilder.putAttributes( | ||
metadata.getColumnName(), | ||
QueryAndGatewayDtoConverter.convertToGatewayValue( | ||
attrName, row.getColumn(i), attributeMetadataMap)); | ||
} | ||
|
||
logEventResult.add(logEventBuilder.build()); | ||
} | ||
} | ||
return logEventResult; | ||
} | ||
|
||
// Adds the sort, limit and offset information to the QueryService if it is requested | ||
private void addSortLimitAndOffset(LogEventsRequest request, QueryRequest.Builder queryBuilder) { | ||
if (request.getOrderByCount() > 0) { | ||
List<OrderByExpression> orderByExpressions = request.getOrderByList(); | ||
queryBuilder.addAllOrderBy( | ||
QueryAndGatewayDtoConverter.convertToQueryOrderByExpressions(orderByExpressions)); | ||
} | ||
|
||
int limit = request.getLimit(); | ||
if (limit > 0) { | ||
queryBuilder.setLimit(limit); | ||
} | ||
|
||
int offset = request.getOffset(); | ||
if (offset > 0) { | ||
queryBuilder.setOffset(offset); | ||
} | ||
} | ||
|
||
private static long convertFromMillis(long timestamp, String toUnit) { | ||
if ("ns".equals(toUnit)) { | ||
return Duration.ofMillis(timestamp).toNanos(); | ||
} | ||
return timestamp; | ||
} | ||
} |
Oops, something went wrong.