diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index 9794abd0137..afc6f3af69a 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -7,6 +7,7 @@ import ( "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/cache" basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/interfaces" ) @@ -27,8 +28,10 @@ func NewDataConnector( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cache cache.Cache, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 1e3382e35c8..666be0e940d 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -71,18 +71,19 @@ type ServerOptions struct { HTTPHostPort string PrivateHostPort string // alert specific params - DisableRules bool - RuleRepoURL string - PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration - CacheConfigPath string - FluxInterval string - Cluster string - GatewayUrl string - UseLogsNewSchema bool - UseTraceNewSchema bool + DisableRules bool + RuleRepoURL string + PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration + CacheConfigPath string + FluxInterval string + FluxIntervalForTraceDetail string + Cluster string + GatewayUrl string + UseLogsNewSchema bool + UseTraceNewSchema bool } // Server runs HTTP api service @@ -145,6 +146,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { modelDao.SetFlagProvider(lm) readerReady := make(chan bool) + fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) + if err != nil { + return nil, err + } + var reader interfaces.DataConnector storage := os.Getenv("STORAGE") if storage == "clickhouse" { @@ -159,6 +165,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, + fluxIntervalForTraceDetail, + serverOptions.SigNoz.Cache, ) go qb.Start(readerReady) reader = qb @@ -250,7 +258,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey) fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) - if err != nil { return nil, err } diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 9a681f395ee..52a34c1b739 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -99,7 +99,7 @@ func main() { var useLogsNewSchema bool var useTraceNewSchema bool - var cacheConfigPath, fluxInterval string + var cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string var enableQueryServiceLogOTLPExport bool var preferSpanMetrics bool @@ -121,6 +121,7 @@ func main() { flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)") @@ -151,24 +152,25 @@ func main() { } serverOptions := &app.ServerOptions{ - Config: config, - SigNoz: signoz, - HTTPHostPort: baseconst.HTTPHostPort, - PromConfigPath: promConfigPath, - SkipTopLvlOpsPath: skipTopLvlOpsPath, - PreferSpanMetrics: preferSpanMetrics, - PrivateHostPort: baseconst.PrivateHostPort, - DisableRules: disableRules, - RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, - CacheConfigPath: cacheConfigPath, - FluxInterval: fluxInterval, - Cluster: cluster, - GatewayUrl: gatewayUrl, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, + Config: config, + SigNoz: signoz, + HTTPHostPort: baseconst.HTTPHostPort, + PromConfigPath: promConfigPath, + SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferSpanMetrics: preferSpanMetrics, + PrivateHostPort: baseconst.PrivateHostPort, + DisableRules: disableRules, + RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, + FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + Cluster: cluster, + GatewayUrl: gatewayUrl, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } // Read the jwt secret key diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0a8a5acead2..89f8c0536de 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -33,6 +33,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/cache" promModel "github.com/prometheus/common/model" "go.uber.org/zap" @@ -41,6 +42,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/logs" "go.signoz.io/signoz/pkg/query-service/app/resource" "go.signoz.io/signoz/pkg/query-service/app/services" + "go.signoz.io/signoz/pkg/query-service/app/traces/tracedetail" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" @@ -156,6 +158,9 @@ type ClickHouseReader struct { traceLocalTableName string traceResourceTableV3 string traceSummaryTable string + + fluxIntervalForTraceDetail time.Duration + cache cache.Cache } // NewTraceReader returns a TraceReader for the database @@ -169,6 +174,8 @@ func NewReader( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cache cache.Cache, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -179,7 +186,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) } func NewReaderFromClickhouseConnection( @@ -191,6 +198,8 @@ func NewReaderFromClickhouseConnection( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cache cache.Cache, ) *ClickHouseReader { alertManager, err := am.New() if err != nil { @@ -277,6 +286,9 @@ func NewReaderFromClickhouseConnection( traceTableName: traceTableName, traceResourceTableV3: options.primary.TraceResourceTableV3, traceSummaryTable: options.primary.TraceSummaryTable, + + fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + cache: cache, } } @@ -1442,6 +1454,372 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc return &searchSpansResult, nil } +func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string, traceDetailsQuery string) ([]model.SpanItemV2, *model.ApiError) { + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return []model.SpanItemV2{}, nil + } + zap.L().Error("Error in processing trace summary sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) + } + + var searchScanResponses []model.SpanItemV2 + queryStartTime := time.Now() + err = r.db.Select(ctx, &searchScanResponses, traceDetailsQuery, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + zap.L().Info(traceDetailsQuery) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) + } + zap.L().Info("trace details query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) + + return searchScanResponses, nil +} + +func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) { + cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache) + cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false) + if err != nil { + zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID)) + return nil, err + } + + if cacheStatus != cache.RetrieveStatusHit { + return nil, errors.Errorf("cache status for getWaterfallSpansForTraceWithMetadata : %s, traceID: %s", cacheStatus, traceID) + } + + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID)) + return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache, traceID: %s", traceID) + } + + zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) + return cachedTraceData, nil +} + +func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) { + response := new(model.GetWaterfallSpansForTraceWithMetadataResponse) + var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64 + var spanIdToSpanNodeMap = map[string]*model.Span{} + var traceRoots []*model.Span + var serviceNameToTotalDurationMap = map[string]uint64{} + var serviceNameIntervalMap = map[string][]tracedetail.Interval{} + + cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID) + if err == nil { + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap + serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap + traceRoots = cachedTraceData.TraceRoots + totalSpans = cachedTraceData.TotalSpans + totalErrorSpans = cachedTraceData.TotalErrorSpans + } + + if err != nil { + zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) + + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName)) + if err != nil { + return nil, err + } + if len(searchScanResponses) == 0 { + return response, nil + } + totalSpans = uint64(len(searchScanResponses)) + + processingBeforeCache := time.Now() + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", zap.Error(err), zap.String("traceID", traceID)) + return nil, model.BadRequest(fmt.Errorf("getWaterfallSpansForTraceWithMetadata: error unmarshalling references %w", err)) + } + + // merge attributes_number and attributes_bool to attributes_string + for k, v := range item.Attributes_bool { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Attributes_number { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Resources_string { + item.Attributes_string[k] = v + } + + jsonItem := model.Span{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: int32(item.Kind), + DurationNano: item.DurationNano, + HasError: item.HasError, + StatusMessage: item.StatusMessage, + StatusCodeString: item.StatusCodeString, + SpanKind: item.SpanKind, + References: ref, + Events: item.Events, + TagMap: item.Attributes_string, + Children: make([]*model.Span, 0), + } + + // convert start timestamp to millis + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + + // collect the intervals for service for execution time calculation + serviceNameIntervalMap[jsonItem.ServiceName] = + append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName}) + + // append to the span node map + spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem + + // metadata calculation + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime { + endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000) + } + if durationNano == 0 || jsonItem.DurationNano > durationNano { + durationNano = jsonItem.DurationNano + } + if jsonItem.HasError { + totalErrorSpans = totalErrorSpans + 1 + } + } + + // traverse through the map and append each node to the children array of the parent node + // and add the missing spans + for _, spanNode := range spanIdToSpanNodeMap { + hasParentSpanNode := false + for _, reference := range spanNode.References { + if reference.RefType == "CHILD_OF" && reference.SpanId != "" { + hasParentSpanNode = true + + if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { + parentNode.Children = append(parentNode.Children, spanNode) + } else { + // insert the missing span + missingSpan := model.Span{ + SpanID: reference.SpanId, + TraceID: spanNode.TraceID, + ServiceName: "", + Name: "Missing Span", + TimeUnixNano: spanNode.TimeUnixNano, + Kind: 0, + DurationNano: spanNode.DurationNano, + HasError: false, + StatusMessage: "", + StatusCodeString: "", + SpanKind: "", + Children: make([]*model.Span, 0), + } + missingSpan.Children = append(missingSpan.Children, spanNode) + spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan + traceRoots = append(traceRoots, &missingSpan) + } + } + } + if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) { + traceRoots = append(traceRoots, spanNode) + } + } + + // sort the trace roots to add missing spans at the right order + sort.Slice(traceRoots, func(i, j int) bool { + if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { + return traceRoots[i].Name < traceRoots[j].Name + } + return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano + }) + + serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap) + + traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{ + StartTime: startTime, + EndTime: endTime, + DurationNano: durationNano, + TotalSpans: totalSpans, + TotalErrorSpans: totalErrorSpans, + SpanIdToSpanNodeMap: spanIdToSpanNodeMap, + ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap, + TraceRoots: traceRoots, + } + + zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) + cacheErr := r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) + if cacheErr != nil { + zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) + } + } + + processingPostCache := time.Now() + selectedSpans, uncollapsedSpans, rootServiceName, rootServiceEntryPoint := tracedetail.GetSelectedSpans(req.UncollapsedSpans, req.SelectedSpanID, traceRoots, spanIdToSpanNodeMap, req.IsSelectedSpanIDUnCollapsed) + zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID)) + + response.Spans = selectedSpans + response.UncollapsedSpans = uncollapsedSpans + response.StartTimestampMillis = startTime + response.EndTimestampMillis = endTime + response.TotalSpansCount = totalSpans + response.TotalErrorSpansCount = totalErrorSpans + response.RootServiceName = rootServiceName + response.RootServiceEntryPoint = rootServiceEntryPoint + response.ServiceNameToTotalDurationMap = serviceNameToTotalDurationMap + response.HasMissingSpans = len(traceRoots) > 1 + return response, nil +} + +func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) { + cachedTraceData := new(model.GetFlamegraphSpansForTraceCache) + cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false) + if err != nil { + zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID)) + return nil, err + } + + if cacheStatus != cache.RetrieveStatusHit { + return nil, errors.Errorf("cache status for getFlamegraphSpansForTrace : %s, traceID: %s", cacheStatus, traceID) + } + + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID)) + return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache, traceID: %s", traceID) + } + + zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) + return cachedTraceData, nil +} + +func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) { + trace := new(model.GetFlamegraphSpansForTraceResponse) + var startTime, endTime, durationNano uint64 + var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{} + // map[traceID][level]span + var selectedSpans = [][]*model.FlamegraphSpan{} + var traceRoots []*model.FlamegraphSpan + + // get the trace tree from cache! + cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, traceID) + + if err == nil { + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + selectedSpans = cachedTraceData.SelectedSpans + traceRoots = cachedTraceData.TraceRoots + } + + if err != nil { + zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) + + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName)) + if err != nil { + return nil, err + } + if len(searchScanResponses) == 0 { + return trace, nil + } + + processingBeforeCache := time.Now() + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)} + } + + jsonItem := model.FlamegraphSpan{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + DurationNano: item.DurationNano, + HasError: item.HasError, + References: ref, + Children: make([]*model.FlamegraphSpan, 0), + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem + + // metadata calculation + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime { + endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000) + } + if durationNano == 0 || jsonItem.DurationNano > durationNano { + durationNano = jsonItem.DurationNano + } + } + + // traverse through the map and append each node to the children array of the parent node + // and add missing spans + for _, spanNode := range spanIdToSpanNodeMap { + hasParentSpanNode := false + for _, reference := range spanNode.References { + if reference.RefType == "CHILD_OF" && reference.SpanId != "" { + hasParentSpanNode = true + if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { + parentNode.Children = append(parentNode.Children, spanNode) + } else { + // insert the missing spans + missingSpan := model.FlamegraphSpan{ + SpanID: reference.SpanId, + TraceID: spanNode.TraceID, + ServiceName: "", + Name: "Missing Span", + TimeUnixNano: spanNode.TimeUnixNano, + DurationNano: spanNode.DurationNano, + HasError: false, + Children: make([]*model.FlamegraphSpan, 0), + } + missingSpan.Children = append(missingSpan.Children, spanNode) + spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan + traceRoots = append(traceRoots, &missingSpan) + } + } + } + if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) { + traceRoots = append(traceRoots, spanNode) + } + } + + selectedSpans = tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap) + traceCache := model.GetFlamegraphSpansForTraceCache{ + StartTime: startTime, + EndTime: endTime, + DurationNano: durationNano, + SelectedSpans: selectedSpans, + TraceRoots: traceRoots, + } + + zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) + cacheErr := r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) + if cacheErr != nil { + zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err)) + } + } + + processingPostCache := time.Now() + selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans) + zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID)) + + trace.Spans = selectedSpansForRequest + trace.StartTimestampMillis = startTime + trace.EndTimestampMillis = endTime + return trace, nil +} + func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { response := []model.ServiceMapDependencyResponseItem{} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index fff4fded57e..e0551440b78 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -546,6 +546,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet) router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/traces/flamegraph/{traceId}", am.ViewAccess(aH.GetFlamegraphSpansForTrace)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/traces/waterfall/{traceId}", am.ViewAccess(aH.GetWaterfallSpansForTraceWithMetadata)).Methods(http.MethodPost) router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) @@ -1777,6 +1779,52 @@ func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWriter, r *http.Request) { + traceID := mux.Vars(r)["traceId"] + if traceID == "" { + RespondError(w, model.BadRequest(errors.New("traceID is required")), nil) + return + } + + req := new(model.GetWaterfallSpansForTraceWithMetadataParams) + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.reader.GetWaterfallSpansForTraceWithMetadata(r.Context(), traceID, req) + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.WriteJSON(w, r, result) +} + +func (aH *APIHandler) GetFlamegraphSpansForTrace(w http.ResponseWriter, r *http.Request) { + traceID := mux.Vars(r)["traceId"] + if traceID == "" { + RespondError(w, model.BadRequest(errors.New("traceID is required")), nil) + return + } + + req := new(model.GetFlamegraphSpansForTraceParams) + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.reader.GetFlamegraphSpansForTrace(r.Context(), traceID, req) + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.WriteJSON(w, r, result) +} + func (aH *APIHandler) listErrors(w http.ResponseWriter, r *http.Request) { query, err := parseListErrorsRequest(r) diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index fcaf13624fb..e0caf6d8c15 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -1384,6 +1384,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { "", true, true, + time.Duration(time.Second), + nil, ) q := &querier{ diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 75defa88efe..800a684e6c0 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -1438,6 +1438,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { "", true, true, + time.Duration(time.Second), + nil, ) q := &querier{ diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 95c36528ae8..d5e54ba9d7b 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -59,18 +59,19 @@ type ServerOptions struct { HTTPHostPort string PrivateHostPort string // alert specific params - DisableRules bool - RuleRepoURL string - PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration - CacheConfigPath string - FluxInterval string - Cluster string - UseLogsNewSchema bool - UseTraceNewSchema bool - SigNoz *signoz.SigNoz + DisableRules bool + RuleRepoURL string + PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration + CacheConfigPath string + FluxInterval string + FluxIntervalForTraceDetail string + Cluster string + UseLogsNewSchema bool + UseTraceNewSchema bool + SigNoz *signoz.SigNoz } // Server runs HTTP, Mux and a grpc server @@ -120,6 +121,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { readerReady := make(chan bool) + fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) + if err != nil { + return nil, err + } + var reader interfaces.Reader storage := os.Getenv("STORAGE") if storage == "clickhouse" { @@ -134,6 +140,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, + fluxIntervalForTraceDetail, + serverOptions.SigNoz.Cache, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader @@ -148,6 +156,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } + var c cache.Cache if serverOptions.CacheConfigPath != "" { cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) diff --git a/pkg/query-service/app/traces/tracedetail/flamegraph.go b/pkg/query-service/app/traces/tracedetail/flamegraph.go new file mode 100644 index 00000000000..b9412ebf8ce --- /dev/null +++ b/pkg/query-service/app/traces/tracedetail/flamegraph.go @@ -0,0 +1,116 @@ +package tracedetail + +import ( + "sort" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +var ( + SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50 +) + +func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool { + for _, v := range slice { + if v.SpanID == item.SpanID { + return true + } + } + return false +} + +func BfsTraversalForTrace(span *model.FlamegraphSpan, level int64) map[int64][]*model.FlamegraphSpan { + bfs := map[int64][]*model.FlamegraphSpan{} + bfs[level] = []*model.FlamegraphSpan{span} + + for _, child := range span.Children { + childBfsMap := BfsTraversalForTrace(child, level+1) + for _level, nodes := range childBfsMap { + bfs[_level] = append(bfs[_level], nodes...) + } + } + span.Level = level + span.Children = make([]*model.FlamegraphSpan, 0) + + return bfs +} + +func FindIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId string) int { + var selectedSpanLevel int = 0 + + for index, _spans := range spans { + if len(_spans) > 0 && _spans[0].SpanID == selectedSpanId { + selectedSpanLevel = index + break + } + } + + return selectedSpanLevel +} + +func GetSelectedSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToSpanNodeMap map[string]*model.FlamegraphSpan) [][]*model.FlamegraphSpan { + + var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{} + selectedSpans := [][]*model.FlamegraphSpan{} + + // sort the trace roots to add missing spans at the right order + sort.Slice(traceRoots, func(i, j int) bool { + if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { + return traceRoots[i].Name < traceRoots[j].Name + } + return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano + }) + + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + bfsMapForTrace := BfsTraversalForTrace(rootNode, 0) + traceIdLevelledFlamegraph[rootSpanID.SpanID] = bfsMapForTrace + } + } + + for _, trace := range traceRoots { + keys := make([]int64, 0, len(traceIdLevelledFlamegraph[trace.SpanID])) + for key := range traceIdLevelledFlamegraph[trace.SpanID] { + keys = append(keys, key) + } + + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + for _, level := range keys { + if ok, exists := traceIdLevelledFlamegraph[trace.SpanID][level]; exists { + selectedSpans = append(selectedSpans, ok) + } + } + } + + return selectedSpans +} + +func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpans [][]*model.FlamegraphSpan) [][]*model.FlamegraphSpan { + var selectedIndex = 0 + + if selectedSpanID != "" { + selectedIndex = FindIndexForSelectedSpan(selectedSpans, selectedSpanID) + } + + lowerLimit := selectedIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.4) + upperLimit := selectedIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.6) + + if lowerLimit < 0 { + upperLimit = upperLimit - lowerLimit + lowerLimit = 0 + } + + if upperLimit > len(selectedSpans) { + lowerLimit = lowerLimit - (upperLimit - len(selectedSpans)) + upperLimit = len(selectedSpans) + } + + if lowerLimit < 0 { + lowerLimit = 0 + } + + return selectedSpans[lowerLimit:upperLimit] +} diff --git a/pkg/query-service/app/traces/tracedetail/waterfall.go b/pkg/query-service/app/traces/tracedetail/waterfall.go new file mode 100644 index 00000000000..36aa88ef4fd --- /dev/null +++ b/pkg/query-service/app/traces/tracedetail/waterfall.go @@ -0,0 +1,214 @@ +package tracedetail + +import ( + "slices" + "sort" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +var ( + SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL float64 = 500 +) + +type Interval struct { + StartTime uint64 + Duration uint64 + Service string +} + +func mergeIntervals(intervals []Interval) []Interval { + if len(intervals) == 0 { + return nil + } + + var merged []Interval + current := intervals[0] + + for i := 1; i < len(intervals); i++ { + next := intervals[i] + if current.StartTime+current.Duration >= next.StartTime { + endTime := max(current.StartTime+current.Duration, next.StartTime+next.Duration) + current.Duration = endTime - current.StartTime + } else { + merged = append(merged, current) + current = next + } + } + // Add the last interval + merged = append(merged, current) + + return merged +} + +func ContainsWaterfallSpan(slice []*model.Span, item *model.Span) bool { + for _, v := range slice { + if v.SpanID == item.SpanID { + return true + } + } + return false +} + +func findIndexForSelectedSpanFromPreOrder(spans []*model.Span, selectedSpanId string) int { + var selectedSpanIndex = -1 + + for index, span := range spans { + if span.SpanID == selectedSpanId { + selectedSpanIndex = index + break + } + } + + return selectedSpanIndex +} + +func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, uncollapsedSpans []string, isSelectedSpanIDUnCollapsed bool) (bool, []string) { + spansFromRootToNode := []string{} + + if node.SpanID == selectedSpanId { + if isSelectedSpanIDUnCollapsed { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + return true, spansFromRootToNode + } + + isPresentInSubtreeForTheNode := false + for _, child := range node.Children { + isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed) + // if the interested node is present in the given subtree then add the span node to uncollapsed node list + if isPresentInThisSubtree { + if !slices.Contains(uncollapsedSpans, node.SpanID) { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + isPresentInSubtreeForTheNode = true + spansFromRootToNode = append(spansFromRootToNode, _spansFromRootToNode...) + } + } + return isPresentInSubtreeForTheNode, spansFromRootToNode +} + +func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, isPartOfPreOrder bool, hasSibling bool, selectedSpanId string) []*model.Span { + preOrderTraversal := []*model.Span{} + + // sort the children to maintain the order across requests + sort.Slice(span.Children, func(i, j int) bool { + if span.Children[i].TimeUnixNano == span.Children[j].TimeUnixNano { + return span.Children[i].Name < span.Children[j].Name + } + return span.Children[i].TimeUnixNano < span.Children[j].TimeUnixNano + }) + + span.SubTreeNodeCount = 0 + nodeWithoutChildren := model.Span{ + SpanID: span.SpanID, + TraceID: span.TraceID, + ServiceName: span.ServiceName, + TimeUnixNano: span.TimeUnixNano, + Name: span.Name, + Kind: int32(span.Kind), + DurationNano: span.DurationNano, + HasError: span.HasError, + StatusMessage: span.StatusMessage, + StatusCodeString: span.StatusCodeString, + SpanKind: span.SpanKind, + References: span.References, + Events: span.Events, + TagMap: span.TagMap, + Children: make([]*model.Span, 0), + HasChildren: len(span.Children) > 0, + Level: level, + HasSiblings: hasSibling, + SubTreeNodeCount: 0, + } + + if isPartOfPreOrder { + preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) + } + + for index, child := range span.Children { + _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, isPartOfPreOrder && slices.Contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) + preOrderTraversal = append(preOrderTraversal, _childTraversal...) + nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 + span.SubTreeNodeCount += child.SubTreeNodeCount + 1 + } + + nodeWithoutChildren.SubTreeNodeCount += 1 + return preOrderTraversal + +} + +func CalculateServiceTime(serviceIntervals map[string][]Interval) map[string]uint64 { + totalTimes := make(map[string]uint64) + + for service, serviceIntervals := range serviceIntervals { + sort.Slice(serviceIntervals, func(i, j int) bool { + return serviceIntervals[i].StartTime < serviceIntervals[j].StartTime + }) + mergedIntervals := mergeIntervals(serviceIntervals) + totalTime := uint64(0) + for _, interval := range mergedIntervals { + totalTime += interval.Duration + } + totalTimes[service] = totalTime + } + + return totalTimes +} + +func GetSelectedSpans(uncollapsedSpans []string, selectedSpanID string, traceRoots []*model.Span, spanIdToSpanNodeMap map[string]*model.Span, isSelectedSpanIDUnCollapsed bool) ([]*model.Span, []string, string, string) { + + var preOrderTraversal = []*model.Span{} + var rootServiceName, rootServiceEntryPoint string + updatedUncollapsedSpans := uncollapsedSpans + + selectedSpanIndex := -1 + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + _, spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, selectedSpanID, updatedUncollapsedSpans, isSelectedSpanIDUnCollapsed) + updatedUncollapsedSpans = append(updatedUncollapsedSpans, spansFromRootToNode...) + + _preOrderTraversal := traverseTrace(rootNode, updatedUncollapsedSpans, 0, true, false, selectedSpanID) + _selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, selectedSpanID) + + if _selectedSpanIndex != -1 { + selectedSpanIndex = _selectedSpanIndex + len(preOrderTraversal) + } + + preOrderTraversal = append(preOrderTraversal, _preOrderTraversal...) + + if rootServiceName == "" { + rootServiceName = rootNode.ServiceName + } + + if rootServiceEntryPoint == "" { + rootServiceEntryPoint = rootNode.Name + } + } + } + + // if we couldn't find the selectedSpan in the trace then defaulting the selected index to 0 + if selectedSpanIndex == -1 && selectedSpanID != "" { + selectedSpanIndex = 0 + } + + // get the 0.4*[span limit] before the interested span index + startIndex := selectedSpanIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.4) + // get the 0.6*[span limit] after the intrested span index + endIndex := selectedSpanIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.6) + + // adjust the sliding window according to the available left and right spaces. + if startIndex < 0 { + endIndex = endIndex - startIndex + startIndex = 0 + } + if endIndex > len(preOrderTraversal) { + startIndex = startIndex - (endIndex - len(preOrderTraversal)) + endIndex = len(preOrderTraversal) + } + if startIndex < 0 { + startIndex = 0 + } + + return preOrderTraversal[startIndex:endIndex], updatedUncollapsedSpans, rootServiceName, rootServiceEntryPoint +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index ac4ab91f9eb..0199113a74e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -41,6 +41,8 @@ type Reader interface { // Search Interfaces SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) + GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) + GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 76498d7372d..7fd7c357924 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -45,7 +45,7 @@ func main() { var useLogsNewSchema bool var useTraceNewSchema bool // the url used to build link in the alert messages in slack and other systems - var ruleRepoURL, cacheConfigPath, fluxInterval string + var ruleRepoURL, cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string var cluster string var preferSpanMetrics bool @@ -63,6 +63,7 @@ func main() { flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") // Allow using the consistent naming with the signoz collector flag.StringVar(&cluster, "cluster-name", "cluster", "(cluster name - defaults to 'cluster')") @@ -95,23 +96,24 @@ func main() { } serverOptions := &app.ServerOptions{ - Config: config, - HTTPHostPort: constants.HTTPHostPort, - PromConfigPath: promConfigPath, - SkipTopLvlOpsPath: skipTopLvlOpsPath, - PreferSpanMetrics: preferSpanMetrics, - PrivateHostPort: constants.PrivateHostPort, - DisableRules: disableRules, - RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, - CacheConfigPath: cacheConfigPath, - FluxInterval: fluxInterval, - Cluster: cluster, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, - SigNoz: signoz, + Config: config, + HTTPHostPort: constants.HTTPHostPort, + PromConfigPath: promConfigPath, + SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferSpanMetrics: preferSpanMetrics, + PrivateHostPort: constants.PrivateHostPort, + DisableRules: disableRules, + RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, + FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + Cluster: cluster, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, + SigNoz: signoz, } // Read the jwt secret key diff --git a/pkg/query-service/model/cacheable.go b/pkg/query-service/model/cacheable.go new file mode 100644 index 00000000000..b691b2765a3 --- /dev/null +++ b/pkg/query-service/model/cacheable.go @@ -0,0 +1,36 @@ +package model + +import "encoding/json" + +type GetWaterfallSpansForTraceWithMetadataCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + TotalSpans uint64 `json:"totalSpans"` + TotalErrorSpans uint64 `json:"totalErrorSpans"` + ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` + SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"` + TraceRoots []*Span `json:"traceRoots"` +} + +func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetWaterfallSpansForTraceWithMetadataCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} + +type GetFlamegraphSpansForTraceCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + SelectedSpans [][]*FlamegraphSpan `json:"selectedSpans"` + TraceRoots []*FlamegraphSpan `json:"traceRoots"` +} + +func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetFlamegraphSpansForTraceCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 342f8f10f0b..e2717fbd876 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -315,6 +315,16 @@ type SearchTracesParams struct { MaxSpansInTrace int `json:"maxSpansInTrace"` } +type GetWaterfallSpansForTraceWithMetadataParams struct { + SelectedSpanID string `json:"selectedSpanId"` + IsSelectedSpanIDUnCollapsed bool `json:"isSelectedSpanIDUnCollapsed"` + UncollapsedSpans []string `json:"uncollapsedSpans"` +} + +type GetFlamegraphSpansForTraceParams struct { + SelectedSpanID string `json:"selectedSpanId"` +} + type SpanFilterParams struct { TraceID []string `json:"traceID"` Status []string `json:"status"` diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index fe8aec6765f..650571dcf08 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -118,6 +118,13 @@ func ForbiddenError(err error) *ApiError { } } +func ExecutionError(err error) *ApiError { + return &ApiError{ + Typ: ErrorExec, + Err: err, + } +} + func WrapApiError(err *ApiError, msg string) *ApiError { return &ApiError{ Typ: err.Type(), @@ -269,6 +276,67 @@ type SearchSpanResponseItem struct { SpanKind string `json:"spanKind"` } +type Span struct { + TimeUnixNano uint64 `json:"timestamp"` + DurationNano uint64 `json:"durationNano"` + SpanID string `json:"spanId"` + RootSpanID string `json:"rootSpanId"` + TraceID string `json:"traceId"` + HasError bool `json:"hasError"` + Kind int32 `json:"kind"` + ServiceName string `json:"serviceName"` + Name string `json:"name"` + References []OtelSpanRef `json:"references,omitempty"` + TagMap map[string]string `json:"tagMap"` + Events []string `json:"event"` + RootName string `json:"rootName"` + StatusMessage string `json:"statusMessage"` + StatusCodeString string `json:"statusCodeString"` + SpanKind string `json:"spanKind"` + Children []*Span `json:"children"` + + // the below two fields are for frontend to render the spans + SubTreeNodeCount uint64 `json:"subTreeNodeCount"` + HasChildren bool `json:"hasChildren"` + HasSiblings bool `json:"hasSiblings"` + Level uint64 `json:"level"` +} + +type FlamegraphSpan struct { + TimeUnixNano uint64 `json:"timestamp"` + DurationNano uint64 `json:"durationNano"` + SpanID string `json:"spanId"` + TraceID string `json:"traceId"` + HasError bool `json:"hasError"` + ServiceName string `json:"serviceName"` + Name string `json:"name"` + Level int64 `json:"level"` + References []OtelSpanRef `json:"references,omitempty"` + Children []*FlamegraphSpan `json:"children"` +} + +type GetWaterfallSpansForTraceWithMetadataResponse struct { + StartTimestampMillis uint64 `json:"startTimestampMillis"` + EndTimestampMillis uint64 `json:"endTimestampMillis"` + DurationNano uint64 `json:"durationNano"` + RootServiceName string `json:"rootServiceName"` + RootServiceEntryPoint string `json:"rootServiceEntryPoint"` + TotalSpansCount uint64 `json:"totalSpansCount"` + TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"` + ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` + Spans []*Span `json:"spans"` + HasMissingSpans bool `json:"hasMissingSpans"` + // this is needed for frontend and query service sync + UncollapsedSpans []string `json:"uncollapsedSpans"` +} + +type GetFlamegraphSpansForTraceResponse struct { + StartTimestampMillis uint64 `json:"startTimestampMillis"` + EndTimestampMillis uint64 `json:"endTimestampMillis"` + DurationNano uint64 `json:"durationNano"` + Spans [][]*FlamegraphSpan `json:"spans"` +} + type OtelSpanRef struct { TraceId string `json:"traceId,omitempty"` SpanId string `json:"spanId,omitempty"` diff --git a/pkg/query-service/model/trace.go b/pkg/query-service/model/trace.go index e8d3d70ac27..ccee5a83d24 100644 --- a/pkg/query-service/model/trace.go +++ b/pkg/query-service/model/trace.go @@ -20,6 +20,7 @@ type SpanItemV2 struct { StatusMessage string `ch:"status_message"` StatusCodeString string `ch:"status_code_string"` SpanKind string `ch:"kind_string"` + ParentSpanId string `ch:"parent_span_id"` } type TraceSummary struct { diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 5a4ab9c9708..7218ca2167a 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -1241,7 +1241,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1340,7 +1340,7 @@ func TestThresholdRuleNoData(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1448,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1573,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index c1c71808a99..e2db99d0190 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -47,6 +47,8 @@ func NewMockClickhouseReader( "", true, true, + time.Duration(time.Second), + nil, ) return reader, mockDB