Skip to content

Commit

Permalink
feat: sync latency metrics api
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 12, 2025
1 parent c708863 commit 5ddb6c0
Show file tree
Hide file tree
Showing 9 changed files with 1,178 additions and 146 deletions.
486 changes: 373 additions & 113 deletions proto/warehouse/warehouse.pb.go

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions proto/warehouse/warehouse.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ service Warehouse {
rpc RetrieveFailedBatches(RetrieveFailedBatchesRequest) returns (RetrieveFailedBatchesResponse);
rpc RetryFailedBatches(RetryFailedBatchesRequest) returns (RetryFailedBatchesResponse);
rpc GetFirstAbortedUploadInContinuousAbortsByDestination(FirstAbortedUploadInContinuousAbortsByDestinationRequest) returns (FirstAbortedUploadInContinuousAbortsByDestinationResponse);
rpc GetSyncLatency(SyncLatencyRequest) returns (SyncLatencyResponse);
}

message Pagination {
Expand Down Expand Up @@ -171,6 +172,23 @@ message FirstAbortedUploadResponse {
google.protobuf.Timestamp last_event_at = 6;
}

message FirstAbortedUploadInContinuousAbortsByDestinationResponse {
message FirstAbortedUploadInContinuousAbortsByDestinationResponse {
repeated FirstAbortedUploadResponse uploads = 1;
}
}

message SyncLatencyRequest {
string destination_id = 1;
string workspace_id = 2;
string start_time = 3;
string aggregation_minutes = 4;
string source_id = 5;
}

message SyncLatencyResponse {
repeated LatencyTimeSeriesDataPoint time_series_data_points = 1 [json_name = "graph"];
}

message LatencyTimeSeriesDataPoint {
google.protobuf.DoubleValue timestamp_millis = 1 [json_name = "bucket"];
google.protobuf.DoubleValue latency_seconds = 2 [json_name = "latency"];
}
37 changes: 37 additions & 0 deletions proto/warehouse/warehouse_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 105 additions & 5 deletions warehouse/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ import (
)

const (
triggeredSuccessfully = "Triggered successfully"
noPendingEvents = "No pending events to sync for this destination"
downloadFileNamePattern = "downloadfile.*.tmp"
noSuchSync = "No such sync exist"
triggeredSuccessfully = "Triggered successfully"
noPendingEvents = "No pending events to sync for this destination"
downloadFileNamePattern = "downloadfile.*.tmp"
noSuchSync = "No such sync exist"
syncFrequencyThresholdMinutes = 30
)

type GRPC struct {
proto.UnimplementedWarehouseServer

conf *config.Config
logger logger.Logger
isMultiWorkspace bool
cpClient cpclient.InternalControlPlane
Expand All @@ -80,7 +82,8 @@ type GRPC struct {
userName string
password string
}
enableTunnelling bool
enableTunnelling bool
defaultLatencyAggForSyncThreshold model.LatencyAggregationType
}
}

Expand All @@ -94,6 +97,7 @@ func NewGRPCServer(
triggerStore *sync.Map,
) (*GRPC, error) {
g := &GRPC{
conf: conf,
logger: logger.Child("grpc"),
tenantManager: tenantManager,
bcManager: bcManager,
Expand Down Expand Up @@ -125,6 +129,11 @@ func NewGRPCServer(
return nil, fmt.Errorf("connection token: %w", err)
}

g.config.defaultLatencyAggForSyncThreshold, err = model.GetLatencyAggregationType(conf.GetString("Warehouse.grpc.defaultLatencyAggForSyncThreshold", "p90"))
if err != nil {
return nil, fmt.Errorf("default latency aggregation type: %w", err)
}

labels := map[string]string{}
if g.config.region != "" {
labels["region"] = g.config.region
Expand Down Expand Up @@ -1032,3 +1041,94 @@ func (g *GRPC) GetFirstAbortedUploadInContinuousAbortsByDestination(

return &proto.FirstAbortedUploadInContinuousAbortsByDestinationResponse{Uploads: uploads}, nil
}

func (g *GRPC) GetSyncLatency(ctx context.Context, request *proto.SyncLatencyRequest) (*proto.SyncLatencyResponse, error) {
log := g.logger.Withn(
obskit.WorkspaceID(request.WorkspaceId),
obskit.SourceID(request.SourceId),
obskit.DestinationID(request.DestinationId),
logger.NewStringField("startTime", request.GetStartTime()),
logger.NewStringField("aggregationMinutes", request.GetAggregationMinutes()),
)
log.Infon("Getting sync latency")

if request.GetWorkspaceId() == "" || request.GetDestinationId() == "" {
return &proto.SyncLatencyResponse{},
status.Error(codes.Code(code.Code_INVALID_ARGUMENT), "workspaceID and destinationID cannot be empty")
}
if request.GetStartTime() == "" {
return &proto.SyncLatencyResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "start time cannot be empty")
}
startTime, err := time.Parse(time.RFC3339, request.GetStartTime())
if err != nil {
return &proto.SyncLatencyResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "start time should be in correct %s format", time.RFC3339)
}
aggregationMinutes, err := strconv.Atoi(request.GetAggregationMinutes())
if err != nil {
return &proto.SyncLatencyResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "aggregation minutes should be a valid integer")
}

srcMap, _ := g.bcManager.ConnectionSourcesMap(request.GetDestinationId())
if len(request.GetSourceId()) > 0 {
if _, ok := srcMap[request.GetSourceId()]; !ok {
return &proto.SyncLatencyResponse{},
status.Error(codes.Code(code.Code_UNAUTHENTICATED), "unauthorized request")
}
}

aggregationType, err := g.getLatencyAggregationType(srcMap, request.GetSourceId())
if err != nil {
return &proto.SyncLatencyResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "unable to get latency aggregation type: %s", err)
}

syncLatencies, err := g.uploadRepo.GetSyncLatencies(ctx, model.SyncLatencyRequest{
WorkspaceID: request.GetWorkspaceId(),
SourceID: request.GetSourceId(),
DestinationID: request.GetDestinationId(),
StartTime: startTime,
AggregationMinutes: int64(aggregationMinutes),
AggregationType: aggregationType,
})
if err != nil {
log.Warnw("unable to get sync latencies", lf.Error, err.Error())
return &proto.SyncLatencyResponse{},
status.Errorf(codes.Code(code.Code_INTERNAL), "unable to get sync latencies: %v", err)
}

resp := &proto.SyncLatencyResponse{
TimeSeriesDataPoints: lo.Map(syncLatencies, func(item model.LatencyTimeSeriesDataPoint, index int) *proto.LatencyTimeSeriesDataPoint {
return &proto.LatencyTimeSeriesDataPoint{
TimestampMillis: wrapperspb.Double(item.TimestampMillis),
LatencySeconds: wrapperspb.Double(item.LatencySeconds),
}
}),
}
return resp, nil
}

func (g *GRPC) getLatencyAggregationType(
srcMap map[string]model.Warehouse, sourceID string,
) (model.LatencyAggregationType, error) {
var syncFrequencies []int
for _, src := range srcMap {
if len(sourceID) > 0 && src.Source.ID != sourceID {
continue
}

freqInMin, err := strconv.ParseInt(src.GetStringDestinationConfig(g.conf, model.SyncFrequencySetting), 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse sync frequency: %v", err)
}
syncFrequencies = append(syncFrequencies, int(freqInMin))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of a signed 64-bit integer from
strconv.ParseInt
to a lower bit size type int without an upper bound check.
}

aggregationType := model.MaxLatency
if len(syncFrequencies) > 0 && lo.Min(syncFrequencies) < syncFrequencyThresholdMinutes {
aggregationType = g.config.defaultLatencyAggForSyncThreshold
}
return aggregationType, nil
}
Loading

0 comments on commit 5ddb6c0

Please sign in to comment.