From 835d7a3c8333ab68fa42c5e53c59dc760dff2f63 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 23 Jan 2025 01:02:48 +0530 Subject: [PATCH 1/5] feat(trace-details): query service changes for trace details --- ee/query-service/app/db/reader.go | 6 +- ee/query-service/app/server.go | 60 +- ee/query-service/main.go | 42 +- .../app/clickhouseReader/reader.go | 524 +++++++++++++++++- .../app/clickhouseReader/utils.go | 125 +++++ pkg/query-service/app/http_handler.go | 48 ++ pkg/query-service/app/querier/querier_test.go | 2 + .../app/querier/v2/querier_test.go | 2 + pkg/query-service/app/server.go | 59 +- pkg/query-service/interfaces/interface.go | 2 + pkg/query-service/main.go | 38 +- pkg/query-service/model/queryParams.go | 10 + pkg/query-service/model/response.go | 96 ++++ pkg/query-service/model/trace.go | 1 + .../rules/threshold_rule_test.go | 8 +- .../tests/integration/test_utils.go | 2 + 16 files changed, 930 insertions(+), 95 deletions(-) create mode 100644 pkg/query-service/app/clickhouseReader/utils.go diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index 9794abd0137..6c01f965519 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -7,6 +7,7 @@ import ( "github.com/jmoiron/sqlx" + cacheV2 "go.signoz.io/signoz/pkg/cache" basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/interfaces" ) @@ -17,6 +18,7 @@ type ClickhouseReader struct { *basechr.ClickHouseReader } +// dummy func NewDataConnector( localDB *sqlx.DB, promConfigPath string, @@ -27,8 +29,10 @@ func NewDataConnector( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cacheV2 cacheV2.Cache, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cacheV2) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index acc5132dc24..7377e3a671c 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -71,18 +71,19 @@ type ServerOptions struct { HTTPHostPort string PrivateHostPort string // alert specific params - DisableRules bool - RuleRepoURL string - PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration - CacheConfigPath string - FluxInterval string - Cluster string - GatewayUrl string - UseLogsNewSchema bool - UseTraceNewSchema bool + DisableRules bool + RuleRepoURL string + PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration + CacheConfigPath string + FluxInterval string + FluxIntervalForTraceDetail string + Cluster string + GatewayUrl string + UseLogsNewSchema bool + UseTraceNewSchema bool } // Server runs HTTP api service @@ -141,10 +142,29 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + // set license manager as feature flag provider in dao modelDao.SetFlagProvider(lm) readerReady := make(chan bool) + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + if err != nil { + return nil, err + } + + fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) + if err != nil { + return nil, err + } + var reader interfaces.DataConnector storage := os.Getenv("STORAGE") if storage == "clickhouse" { @@ -159,6 +179,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, + fluxIntervalForTraceDetail, + serverOptions.SigNoz.Cache, ) go qb.Start(readerReady) reader = qb @@ -173,14 +195,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } <-readerReady rm, err := makeRulesManager(serverOptions.PromConfigPath, @@ -249,12 +263,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey) - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) - - if err != nil { - return nil, err - } - apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 9a681f395ee..9fdb4f17c41 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -99,7 +99,7 @@ func main() { var useLogsNewSchema bool var useTraceNewSchema bool - var cacheConfigPath, fluxInterval string + var cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string var enableQueryServiceLogOTLPExport bool var preferSpanMetrics bool @@ -120,7 +120,8 @@ func main() { flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") - flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxInterval, "flux-interval", "0m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)") @@ -151,24 +152,25 @@ func main() { } serverOptions := &app.ServerOptions{ - Config: config, - SigNoz: signoz, - HTTPHostPort: baseconst.HTTPHostPort, - PromConfigPath: promConfigPath, - SkipTopLvlOpsPath: skipTopLvlOpsPath, - PreferSpanMetrics: preferSpanMetrics, - PrivateHostPort: baseconst.PrivateHostPort, - DisableRules: disableRules, - RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, - CacheConfigPath: cacheConfigPath, - FluxInterval: fluxInterval, - Cluster: cluster, - GatewayUrl: gatewayUrl, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, + Config: config, + SigNoz: signoz, + HTTPHostPort: baseconst.HTTPHostPort, + PromConfigPath: promConfigPath, + SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferSpanMetrics: preferSpanMetrics, + PrivateHostPort: baseconst.PrivateHostPort, + DisableRules: disableRules, + RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, + FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + Cluster: cluster, + GatewayUrl: gatewayUrl, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } // Read the jwt secret key diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0a8a5acead2..110625c60ba 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -33,6 +33,8 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/cache" + cacheV2 "go.signoz.io/signoz/pkg/cache" promModel "github.com/prometheus/common/model" "go.uber.org/zap" @@ -156,6 +158,9 @@ type ClickHouseReader struct { traceLocalTableName string traceResourceTableV3 string traceSummaryTable string + + fluxIntervalForTraceDetail time.Duration + cacheV2 cacheV2.Cache } // NewTraceReader returns a TraceReader for the database @@ -169,6 +174,8 @@ func NewReader( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cacheV2 cacheV2.Cache, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -179,7 +186,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cacheV2) } func NewReaderFromClickhouseConnection( @@ -191,6 +198,8 @@ func NewReaderFromClickhouseConnection( cluster string, useLogsNewSchema bool, useTraceNewSchema bool, + fluxIntervalForTraceDetail time.Duration, + cacheV2 cacheV2.Cache, ) *ClickHouseReader { alertManager, err := am.New() if err != nil { @@ -277,6 +286,9 @@ func NewReaderFromClickhouseConnection( traceTableName: traceTableName, traceResourceTableV3: options.primary.TraceResourceTableV3, traceSummaryTable: options.primary.TraceSummaryTable, + + fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + cacheV2: cacheV2, } } @@ -1442,6 +1454,516 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc return &searchSpansResult, nil } +type Interval struct { + StartTime uint64 + Duration uint64 + Service string +} + +func calculateServiceTime(serviceIntervals map[string][]Interval) map[string]uint64 { + totalTimes := make(map[string]uint64) + + for service, serviceIntervals := range serviceIntervals { + sort.Slice(serviceIntervals, func(i, j int) bool { + return serviceIntervals[i].StartTime < serviceIntervals[j].StartTime + }) + mergedIntervals := mergeIntervals(serviceIntervals) + totalTime := uint64(0) + for _, interval := range mergedIntervals { + totalTime += interval.Duration + } + totalTimes[service] = totalTime + } + + return totalTimes +} + +func mergeIntervals(intervals []Interval) []Interval { + if len(intervals) == 0 { + return nil + } + + var merged []Interval + current := intervals[0] + + for i := 1; i < len(intervals); i++ { + next := intervals[i] + if current.StartTime+current.Duration >= next.StartTime { + endTime := max(current.StartTime+current.Duration, next.StartTime+next.Duration) + current.Duration = endTime - current.StartTime + } else { + merged = append(merged, current) + current = next + } + } + // Add the last interval + merged = append(merged, current) + + return merged +} + +func max(a, b uint64) uint64 { + if a > b { + return a + } + return b +} + +func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) { + response := new(model.GetWaterfallSpansForTraceWithMetadataResponse) + var startTime, endTime, durationNano, totalErrorSpans uint64 + var spanIdToSpanNodeMap = map[string]*model.Span{} + var traceRoots []*model.Span + var serviceNameToTotalDurationMap = map[string]uint64{} + var useCache bool = true + + cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache) + cacheStatus, err := r.cacheV2.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false) + if err != nil { + zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err)) + useCache = false + } + if cacheStatus != cache.RetrieveStatusHit { + useCache = false + } + + if err == nil && cacheStatus == cache.RetrieveStatusHit { + + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + useCache = false + } + + if useCache { + zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap + serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap + traceRoots = cachedTraceData.TraceRoots + response.TotalSpansCount = cachedTraceData.TotalSpans + totalErrorSpans = cachedTraceData.TotalErrorSpans + } + + } + + if !useCache { + zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) + + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return response, nil + } + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + } + response.TotalSpansCount = traceSummary.NumSpans + + var searchScanResponses []model.SpanItemV2 + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string , parent_span_id FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) + start := time.Now() + err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + zap.L().Info(query) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + } + end := time.Now() + zap.L().Debug("GetWaterfallSpansForTraceWithMetadata took: ", zap.Duration("duration", end.Sub(start))) + + var serviceNameIntervalMap = map[string][]Interval{} + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)} + } + + // merge attributes_number and attributes_bool to attributes_string + for k, v := range item.Attributes_bool { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Attributes_number { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Resources_string { + item.Attributes_string[k] = v + } + + jsonItem := model.Span{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: int32(item.Kind), + DurationNano: item.DurationNano, + HasError: item.HasError, + StatusMessage: item.StatusMessage, + StatusCodeString: item.StatusCodeString, + SpanKind: item.SpanKind, + References: ref, + Events: item.Events, + TagMap: item.Attributes_string, + ParentSpanId: item.ParentSpanId, + Children: make([]*model.Span, 0), + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + + serviceNameIntervalMap[jsonItem.ServiceName] = + append(serviceNameIntervalMap[jsonItem.ServiceName], Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName}) + + spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem + + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime { + endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000) + } + if durationNano == 0 || jsonItem.DurationNano > durationNano { + durationNano = jsonItem.DurationNano + } + if jsonItem.HasError { + totalErrorSpans = totalErrorSpans + 1 + } + } + + serviceNameToTotalDurationMap = calculateServiceTime(serviceNameIntervalMap) + + // traverse through the map and append each node to the children array of the parent node + // capture the root nodes as well + for _, spanNode := range spanIdToSpanNodeMap { + hasParentRelationship := false + for _, reference := range spanNode.References { + if reference.RefType == "CHILD_OF" && reference.SpanId != "" { + hasParentRelationship = true + if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { + parentNode.Children = append(parentNode.Children, spanNode) + } else { + // insert the missing spans + missingSpan := model.Span{ + SpanID: reference.SpanId, + TraceID: spanNode.TraceID, + ServiceName: "", + Name: "Missing Span", + TimeUnixNano: spanNode.TimeUnixNano, + Kind: 0, + DurationNano: spanNode.DurationNano, + HasError: false, + StatusMessage: "", + StatusCodeString: "", + SpanKind: "", + Children: make([]*model.Span, 0), + } + missingSpan.Children = append(missingSpan.Children, spanNode) + spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan + traceRoots = append(traceRoots, &missingSpan) + } + } + } + if !hasParentRelationship { + traceRoots = append(traceRoots, spanNode) + } + } + + sort.Slice(traceRoots, func(i, j int) bool { + if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { + return traceRoots[i].Name < traceRoots[j].Name + } + return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano + }) + + traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{ + StartTime: startTime, + EndTime: endTime, + DurationNano: durationNano, + TotalSpans: traceSummary.NumSpans, + TotalErrorSpans: totalErrorSpans, + SpanIdToSpanNodeMap: spanIdToSpanNodeMap, + ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap, + TraceRoots: traceRoots, + } + + err = r.cacheV2.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) + if err != nil { + zap.L().Debug("failed to store cache fpr getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) + } + } + + var preOrderTraversal = []*model.Span{} + uncollapsedSpans := req.UncollapsedSpans + + selectedSpanIndex := -1 + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + _, _spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, req.SelectedSpanID, uncollapsedSpans, req.IsSelectedSpanIDUnCollapsed) + uncollapsedSpans = append(uncollapsedSpans, _spansFromRootToNode...) + _preOrderTraversal := traverseTraceAndAddRequiredMetadata(rootNode, uncollapsedSpans, 0, true, false, req.SelectedSpanID) + + _selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, req.SelectedSpanID) + if _selectedSpanIndex != -1 { + selectedSpanIndex = _selectedSpanIndex + len(preOrderTraversal) + } + preOrderTraversal = append(preOrderTraversal, _preOrderTraversal...) + + if response.RootServiceName == "" { + response.RootServiceName = rootNode.ServiceName + } + + if response.RootServiceEntryPoint == "" { + response.RootServiceEntryPoint = rootNode.Name + } + } + } + + // the index of the interested span id shouldn't be -1 as the span should exist + if selectedSpanIndex == -1 && req.SelectedSpanID != "" { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("selected span ID not found in the traversal")} + } + // get the 0.4*[span limit] before the interested span index + startIndex := selectedSpanIndex - 200 + // get the 0.6*[span limit] after the intrested span index + endIndex := selectedSpanIndex + 300 + // adjust the sliding window according to the available left and right spaces. + if startIndex < 0 { + endIndex = endIndex - startIndex + startIndex = 0 + } + if endIndex > len(preOrderTraversal) { + startIndex = startIndex - (endIndex - len(preOrderTraversal)) + endIndex = len(preOrderTraversal) + } + + if startIndex < 0 { + startIndex = 0 + } + selectedSpans := preOrderTraversal[startIndex:endIndex] + + // generate the response [ spans , metadata ] + response.Spans = selectedSpans + response.UncollapsedSpans = uncollapsedSpans + response.StartTimestampMillis = startTime + response.EndTimestampMillis = endTime + response.TotalErrorSpansCount = totalErrorSpans + response.ServiceNameToTotalDurationMap = serviceNameToTotalDurationMap + response.HasMissingSpans = len(traceRoots) > 1 + return response, nil +} + +func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) { + trace := new(model.GetFlamegraphSpansForTraceResponse) + var startTime, endTime, durationNano uint64 + var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{} + // map[traceID][level]span + var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{} + var selectedSpans = [][]*model.FlamegraphSpan{} + var traceRoots []*model.FlamegraphSpan + var useCache bool = true + + // get the trace tree from cache! + cachedTraceData := new(model.GetFlamegraphSpansForTraceCache) + cacheStatus, err := r.cacheV2.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false) + if err != nil { + zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err)) + useCache = false + } + + if cacheStatus != cache.RetrieveStatusHit { + useCache = false + } + + if err == nil && cacheStatus == cache.RetrieveStatusHit { + + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + useCache = false + } + + if useCache { + zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + selectedSpans = cachedTraceData.SelectedSpans + traceRoots = cachedTraceData.TraceRoots + } + } + + if !useCache { + zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) + + // fetch the start, end and number of spans from the summary table, start and end are required for the trace query + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return trace, nil + } + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + } + + // fetch all the spans belonging to the trace from the main table + var searchScanResponses []model.SpanItemV2 + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name,parent_span_id FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) + start := time.Now() + err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + zap.L().Info(query) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + } + end := time.Now() + zap.L().Debug("getFlamegraphSpansForTrace took: ", zap.Duration("duration", end.Sub(start))) + + // create the trace tree based on the spans fetched above + // create a map of [spanId]: spanNode + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)} + } + // create the span node + jsonItem := model.FlamegraphSpan{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + DurationNano: (item.DurationNano), + HasError: item.HasError, + ParentSpanId: item.ParentSpanId, + References: ref, + Children: make([]*model.FlamegraphSpan, 0), + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + // assign the span node to the span map + spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem + + // metadata calculation + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime { + endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000) + } + if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { + durationNano = uint64(jsonItem.DurationNano) + } + } + + // traverse through the map and append each node to the children array of the parent node + for _, spanNode := range spanIdToSpanNodeMap { + hasParentRelationship := false + for _, reference := range spanNode.References { + if reference.RefType == "CHILD_OF" && reference.SpanId != "" { + hasParentRelationship = true + if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { + parentNode.Children = append(parentNode.Children, spanNode) + } else { + // insert the missing spans + missingSpan := model.FlamegraphSpan{ + SpanID: reference.SpanId, + TraceID: spanNode.TraceID, + ServiceName: "", + Name: "Missing Span", + TimeUnixNano: spanNode.TimeUnixNano, + DurationNano: spanNode.DurationNano, + HasError: false, + Children: make([]*model.FlamegraphSpan, 0), + } + missingSpan.Children = append(missingSpan.Children, spanNode) + spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan + traceRoots = append(traceRoots, &missingSpan) + } + } + } + if !hasParentRelationship { + traceRoots = append(traceRoots, spanNode) + } + } + + sort.Slice(traceRoots, func(i, j int) bool { + if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { + return traceRoots[i].Name < traceRoots[j].Name + } + return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano + }) + + var bfsMapForTrace = map[int64][]*model.FlamegraphSpan{} + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + bfsMapForTrace = map[int64][]*model.FlamegraphSpan{} + bfsTraversalForTrace(rootNode, 0, &bfsMapForTrace) + traceIdLevelledFlamegraph[rootSpanID.SpanID] = bfsMapForTrace + } + } + + for _, trace := range traceRoots { + keys := make([]int64, 0, len(traceIdLevelledFlamegraph[trace.SpanID])) + for key := range traceIdLevelledFlamegraph[trace.SpanID] { + keys = append(keys, key) + } + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + for _, level := range keys { + if ok, exists := traceIdLevelledFlamegraph[trace.SpanID][level]; exists { + selectedSpans = append(selectedSpans, ok) + } + } + } + + traceCache := model.GetFlamegraphSpansForTraceCache{ + StartTime: startTime, + EndTime: endTime, + DurationNano: durationNano, + SelectedSpans: selectedSpans, + TraceRoots: traceRoots, + } + + err = r.cacheV2.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) + if err != nil { + zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err)) + } + } + + var selectedIndex int64 = 0 + if req.SelectedSpanID != "" { + selectedIndex = findIndexForSelectedSpan(selectedSpans, req.SelectedSpanID) + } + + lowerLimit := selectedIndex - 20 + upperLimit := selectedIndex + 30 + + if lowerLimit < 0 { + upperLimit = upperLimit - lowerLimit + lowerLimit = 0 + } + + if upperLimit > int64(len(selectedSpans)) { + lowerLimit = lowerLimit - (upperLimit - int64(len(selectedSpans))) + upperLimit = int64(len(selectedSpans)) + } + + if lowerLimit < 0 { + lowerLimit = 0 + } + + trace.Spans = selectedSpans[lowerLimit:upperLimit] + trace.StartTimestampMillis = startTime + trace.EndTimestampMillis = endTime + return trace, nil +} + func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { response := []model.ServiceMapDependencyResponseItem{} diff --git a/pkg/query-service/app/clickhouseReader/utils.go b/pkg/query-service/app/clickhouseReader/utils.go new file mode 100644 index 00000000000..19f3203063d --- /dev/null +++ b/pkg/query-service/app/clickhouseReader/utils.go @@ -0,0 +1,125 @@ +package clickhouseReader + +import ( + "sort" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +func contains(slice []string, item string) bool { + for _, v := range slice { + if v == item { + return true + } + } + return false +} + +func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, uncollapsedSpans []string, isSelectedSpanIDUnCollapsed bool) (bool, []string) { + spansFromRootToNode := []string{} + if node.SpanID == selectedSpanId { + if isSelectedSpanIDUnCollapsed { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + return true, spansFromRootToNode + } + isPresentInSubtreeForTheNode := false + for _, child := range node.Children { + isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed) + // if the interested node is present in the given subtree then add the span node to uncollapsed node list + if isPresentInThisSubtree { + if !contains(uncollapsedSpans, node.SpanID) { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + isPresentInSubtreeForTheNode = true + spansFromRootToNode = append(spansFromRootToNode, _spansFromRootToNode...) + } + } + return isPresentInSubtreeForTheNode, spansFromRootToNode +} + +func traverseTraceAndAddRequiredMetadata(span *model.Span, uncollapsedSpans []string, level uint64, isPartOfPreorder bool, hasSibling bool, selectedSpanId string) []*model.Span { + preOrderTraversal := []*model.Span{} + sort.Slice(span.Children, func(i, j int) bool { + if span.Children[i].TimeUnixNano == span.Children[j].TimeUnixNano { + return span.Children[i].Name < span.Children[j].Name + } + return span.Children[i].TimeUnixNano < span.Children[j].TimeUnixNano + }) + span.SubTreeNodeCount = 0 + nodeWithoutChildren := model.Span{ + SpanID: span.SpanID, + TraceID: span.TraceID, + ServiceName: span.ServiceName, + TimeUnixNano: span.TimeUnixNano, + Name: span.Name, + Kind: int32(span.Kind), + DurationNano: span.DurationNano, + HasError: span.HasError, + StatusMessage: span.StatusMessage, + StatusCodeString: span.StatusCodeString, + SpanKind: span.SpanKind, + References: span.References, + Events: span.Events, + TagMap: span.TagMap, + ParentSpanId: span.ParentSpanId, + Children: make([]*model.Span, 0), + HasChildren: len(span.Children) > 0, + Level: level, + HasSiblings: hasSibling, + SubTreeNodeCount: 0, + } + if isPartOfPreorder { + preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) + } + + for index, child := range span.Children { + _childTraversal := traverseTraceAndAddRequiredMetadata(child, uncollapsedSpans, level+1, isPartOfPreorder && contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) + preOrderTraversal = append(preOrderTraversal, _childTraversal...) + nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 + span.SubTreeNodeCount += child.SubTreeNodeCount + 1 + } + + return preOrderTraversal + +} + +func bfsTraversalForTrace(span *model.FlamegraphSpan, level int64, bfsMap *map[int64][]*model.FlamegraphSpan) { + ok, exists := (*bfsMap)[level] + span.Level = level + if exists { + (*bfsMap)[level] = append(ok, span) + } else { + (*bfsMap)[level] = []*model.FlamegraphSpan{span} + } + for _, child := range span.Children { + bfsTraversalForTrace(child, level+1, bfsMap) + } + span.Children = make([]*model.FlamegraphSpan, 0) +} + +func findIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId string) int64 { + var selectedSpanLevel int64 = 0 + + for index, _spans := range spans { + if len(_spans) > 0 && _spans[0].SpanID == selectedSpanId { + selectedSpanLevel = int64(index) + break + } + } + + return selectedSpanLevel +} + +func findIndexForSelectedSpanFromPreOrder(spans []*model.Span, selectedSpanId string) int { + var selectedSpanIndex = -1 + + for index, span := range spans { + if span.SpanID == selectedSpanId { + selectedSpanIndex = index + break + } + } + + return selectedSpanIndex +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index fff4fded57e..6a764c51de1 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -546,6 +546,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet) router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/traces/flamegraph/{traceId}", am.ViewAccess(aH.GetFlamegraphSpansForTrace)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/traces/{traceId}", am.ViewAccess(aH.GetWaterfallSpansForTraceWithMetadata)).Methods(http.MethodPost) router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) @@ -1777,6 +1779,52 @@ func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWriter, r *http.Request) { + traceID := mux.Vars(r)["traceId"] + if traceID == "" { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: errors.New("traceID is required")}, nil) + return + } + + req := new(model.GetWaterfallSpansForTraceWithMetadataParams) + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.reader.GetWaterfallSpansForTraceWithMetadata(r.Context(), traceID, req) + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.WriteJSON(w, r, result) +} + +func (aH *APIHandler) GetFlamegraphSpansForTrace(w http.ResponseWriter, r *http.Request) { + traceID := mux.Vars(r)["traceId"] + if traceID == "" { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: errors.New("traceID is required")}, nil) + return + } + + req := new(model.GetFlamegraphSpansForTraceParams) + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.reader.GetFlamegraphSpansForTrace(r.Context(), traceID, req) + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.WriteJSON(w, r, result) +} + func (aH *APIHandler) listErrors(w http.ResponseWriter, r *http.Request) { query, err := parseListErrorsRequest(r) diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index fcaf13624fb..e0caf6d8c15 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -1384,6 +1384,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { "", true, true, + time.Duration(time.Second), + nil, ) q := &querier{ diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 75defa88efe..800a684e6c0 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -1438,6 +1438,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { "", true, true, + time.Duration(time.Second), + nil, ) q := &querier{ diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f03e96a8749..3ad81b8223a 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -62,18 +62,19 @@ type ServerOptions struct { HTTPHostPort string PrivateHostPort string // alert specific params - DisableRules bool - RuleRepoURL string - PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration - CacheConfigPath string - FluxInterval string - Cluster string - UseLogsNewSchema bool - UseTraceNewSchema bool - SigNoz *signoz.SigNoz + DisableRules bool + RuleRepoURL string + PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration + CacheConfigPath string + FluxInterval string + FluxIntervalForTraceDetail string + Cluster string + UseLogsNewSchema bool + UseTraceNewSchema bool + SigNoz *signoz.SigNoz } // Server runs HTTP, Mux and a grpc server @@ -123,6 +124,25 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { readerReady := make(chan bool) + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + if err != nil { + return nil, err + } + + fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) + if err != nil { + return nil, err + } + var reader interfaces.Reader storage := os.Getenv("STORAGE") if storage == "clickhouse" { @@ -137,6 +157,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, + fluxIntervalForTraceDetail, + nil, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader @@ -151,14 +173,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } <-readerReady rm, err := makeRulesManager( @@ -169,11 +183,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) - if err != nil { - return nil, err - } - integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB()) if err != nil { return nil, fmt.Errorf("couldn't create integrations controller: %w", err) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index ac4ab91f9eb..0199113a74e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -41,6 +41,8 @@ type Reader interface { // Search Interfaces SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) + GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) + GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 76498d7372d..7fd7c357924 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -45,7 +45,7 @@ func main() { var useLogsNewSchema bool var useTraceNewSchema bool // the url used to build link in the alert messages in slack and other systems - var ruleRepoURL, cacheConfigPath, fluxInterval string + var ruleRepoURL, cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string var cluster string var preferSpanMetrics bool @@ -63,6 +63,7 @@ func main() { flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") // Allow using the consistent naming with the signoz collector flag.StringVar(&cluster, "cluster-name", "cluster", "(cluster name - defaults to 'cluster')") @@ -95,23 +96,24 @@ func main() { } serverOptions := &app.ServerOptions{ - Config: config, - HTTPHostPort: constants.HTTPHostPort, - PromConfigPath: promConfigPath, - SkipTopLvlOpsPath: skipTopLvlOpsPath, - PreferSpanMetrics: preferSpanMetrics, - PrivateHostPort: constants.PrivateHostPort, - DisableRules: disableRules, - RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, - CacheConfigPath: cacheConfigPath, - FluxInterval: fluxInterval, - Cluster: cluster, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, - SigNoz: signoz, + Config: config, + HTTPHostPort: constants.HTTPHostPort, + PromConfigPath: promConfigPath, + SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferSpanMetrics: preferSpanMetrics, + PrivateHostPort: constants.PrivateHostPort, + DisableRules: disableRules, + RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, + FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, + Cluster: cluster, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, + SigNoz: signoz, } // Read the jwt secret key diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 342f8f10f0b..e2717fbd876 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -315,6 +315,16 @@ type SearchTracesParams struct { MaxSpansInTrace int `json:"maxSpansInTrace"` } +type GetWaterfallSpansForTraceWithMetadataParams struct { + SelectedSpanID string `json:"selectedSpanId"` + IsSelectedSpanIDUnCollapsed bool `json:"isSelectedSpanIDUnCollapsed"` + UncollapsedSpans []string `json:"uncollapsedSpans"` +} + +type GetFlamegraphSpansForTraceParams struct { + SelectedSpanID string `json:"selectedSpanId"` +} + type SpanFilterParams struct { TraceID []string `json:"traceID"` Status []string `json:"status"` diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index fe8aec6765f..7ef1c1d9484 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -269,6 +269,102 @@ type SearchSpanResponseItem struct { SpanKind string `json:"spanKind"` } +type Span struct { + TimeUnixNano uint64 `json:"timestamp"` + DurationNano uint64 `json:"durationNano"` + SpanID string `json:"spanId"` + RootSpanID string `json:"rootSpanId"` + ParentSpanId string `json:"parentSpanId"` + TraceID string `json:"traceId"` + HasError bool `json:"hasError"` + Kind int32 `json:"kind"` + ServiceName string `json:"serviceName"` + Name string `json:"name"` + References []OtelSpanRef `json:"references,omitempty"` + TagMap map[string]string `json:"tagMap"` + Events []string `json:"event"` + RootName string `json:"rootName"` + StatusMessage string `json:"statusMessage"` + StatusCodeString string `json:"statusCodeString"` + SpanKind string `json:"spanKind"` + Children []*Span `json:"children"` + + // the below two fields are for frontend to render the spans + SubTreeNodeCount uint64 `json:"subTreeNodeCount"` + HasChildren bool `json:"hasChildren"` + HasSiblings bool `json:"hasSiblings"` + Level uint64 `json:"level"` +} + +type FlamegraphSpan struct { + TimeUnixNano uint64 `json:"timestamp"` + DurationNano uint64 `json:"durationNano"` + SpanID string `json:"spanId"` + ParentSpanId string `json:"parentSpanId"` + TraceID string `json:"traceId"` + HasError bool `json:"hasError"` + ServiceName string `json:"serviceName"` + Name string `json:"name"` + Level int64 `json:"level"` + References []OtelSpanRef `json:"references,omitempty"` + Children []*FlamegraphSpan `json:"children"` +} + +type GetWaterfallSpansForTraceWithMetadataCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + TotalSpans uint64 `json:"totalSpans"` + TotalErrorSpans uint64 `json:"totalErrorSpans"` + ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` + SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"` + TraceRoots []*Span `json:"traceRoots"` +} + +func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetWaterfallSpansForTraceWithMetadataCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} + +type GetFlamegraphSpansForTraceCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + SelectedSpans [][]*FlamegraphSpan `json:"selectedSpans"` + TraceRoots []*FlamegraphSpan `json:"traceRoots"` +} + +func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetFlamegraphSpansForTraceCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} + +type GetWaterfallSpansForTraceWithMetadataResponse struct { + StartTimestampMillis uint64 `json:"startTimestampMillis"` + EndTimestampMillis uint64 `json:"endTimestampMillis"` + DurationNano uint64 `json:"durationNano"` + RootServiceName string `json:"rootServiceName"` + RootServiceEntryPoint string `json:"rootServiceEntryPoint"` + TotalSpansCount uint64 `json:"totalSpansCount"` + TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"` + ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` + Spans []*Span `json:"spans"` + HasMissingSpans bool `json:"hasMissingSpans"` + // this is needed for frontend and query service sync + UncollapsedSpans []string `json:"uncollapsedSpans"` +} + +type GetFlamegraphSpansForTraceResponse struct { + StartTimestampMillis uint64 `json:"startTimestampMillis"` + EndTimestampMillis uint64 `json:"endTimestampMillis"` + DurationNano uint64 `json:"durationNano"` + Spans [][]*FlamegraphSpan `json:"spans"` +} + type OtelSpanRef struct { TraceId string `json:"traceId,omitempty"` SpanId string `json:"spanId,omitempty"` diff --git a/pkg/query-service/model/trace.go b/pkg/query-service/model/trace.go index e8d3d70ac27..ccee5a83d24 100644 --- a/pkg/query-service/model/trace.go +++ b/pkg/query-service/model/trace.go @@ -20,6 +20,7 @@ type SpanItemV2 struct { StatusMessage string `ch:"status_message"` StatusCodeString string `ch:"status_code_string"` SpanKind string `ch:"kind_string"` + ParentSpanId string `ch:"parent_span_id"` } type TraceSummary struct { diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 5a4ab9c9708..7218ca2167a 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -1241,7 +1241,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1340,7 +1340,7 @@ func TestThresholdRuleNoData(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1448,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1573,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index c1c71808a99..e2db99d0190 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -47,6 +47,8 @@ func NewMockClickhouseReader( "", true, true, + time.Duration(time.Second), + nil, ) return reader, mockDB From 90476d71c4ec36c9a738d34518ab0d38820c1033 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 23 Jan 2025 17:16:18 +0530 Subject: [PATCH 2/5] feat(trace-detail): refactor query service trace apis --- ee/query-service/app/db/reader.go | 6 +- ee/query-service/app/server.go | 27 +- .../app/clickhouseReader/reader.go | 279 ++++-------------- .../app/clickhouseReader/utils.go | 125 -------- pkg/query-service/app/http_handler.go | 6 +- pkg/query-service/app/server.go | 28 +- .../app/traces/tracedetail/flamegraph.go | 118 ++++++++ .../app/traces/tracedetail/waterfall.go | 228 ++++++++++++++ pkg/query-service/model/cacheable.go | 36 +++ pkg/query-service/model/response.go | 42 +-- 10 files changed, 485 insertions(+), 410 deletions(-) delete mode 100644 pkg/query-service/app/clickhouseReader/utils.go create mode 100644 pkg/query-service/app/traces/tracedetail/flamegraph.go create mode 100644 pkg/query-service/app/traces/tracedetail/waterfall.go create mode 100644 pkg/query-service/model/cacheable.go diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index 6c01f965519..071ec70d199 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -7,7 +7,7 @@ import ( "github.com/jmoiron/sqlx" - cacheV2 "go.signoz.io/signoz/pkg/cache" + "go.signoz.io/signoz/pkg/cache" basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/interfaces" ) @@ -30,9 +30,9 @@ func NewDataConnector( useLogsNewSchema bool, useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, - cacheV2 cacheV2.Cache, + cache cache.Cache, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cacheV2) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 7377e3a671c..834fd950496 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -142,24 +142,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } - // set license manager as feature flag provider in dao modelDao.SetFlagProvider(lm) readerReady := make(chan bool) - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) - if err != nil { - return nil, err - } - fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) if err != nil { return nil, err @@ -195,6 +181,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } <-readerReady rm, err := makeRulesManager(serverOptions.PromConfigPath, @@ -263,6 +257,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey) + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + if err != nil { + return nil, err + } + apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 110625c60ba..2dbf6e16bb8 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -34,7 +34,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/jmoiron/sqlx" "go.signoz.io/signoz/pkg/cache" - cacheV2 "go.signoz.io/signoz/pkg/cache" promModel "github.com/prometheus/common/model" "go.uber.org/zap" @@ -43,6 +42,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/logs" "go.signoz.io/signoz/pkg/query-service/app/resource" "go.signoz.io/signoz/pkg/query-service/app/services" + "go.signoz.io/signoz/pkg/query-service/app/traces/tracedetail" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" @@ -160,7 +160,7 @@ type ClickHouseReader struct { traceSummaryTable string fluxIntervalForTraceDetail time.Duration - cacheV2 cacheV2.Cache + cache cache.Cache } // NewTraceReader returns a TraceReader for the database @@ -175,7 +175,7 @@ func NewReader( useLogsNewSchema bool, useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, - cacheV2 cacheV2.Cache, + cache cache.Cache, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -186,7 +186,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cacheV2) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) } func NewReaderFromClickhouseConnection( @@ -199,7 +199,7 @@ func NewReaderFromClickhouseConnection( useLogsNewSchema bool, useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, - cacheV2 cacheV2.Cache, + cache cache.Cache, ) *ClickHouseReader { alertManager, err := am.New() if err != nil { @@ -288,7 +288,7 @@ func NewReaderFromClickhouseConnection( traceSummaryTable: options.primary.TraceSummaryTable, fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, - cacheV2: cacheV2, + cache: cache, } } @@ -1454,73 +1454,19 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc return &searchSpansResult, nil } -type Interval struct { - StartTime uint64 - Duration uint64 - Service string -} - -func calculateServiceTime(serviceIntervals map[string][]Interval) map[string]uint64 { - totalTimes := make(map[string]uint64) - - for service, serviceIntervals := range serviceIntervals { - sort.Slice(serviceIntervals, func(i, j int) bool { - return serviceIntervals[i].StartTime < serviceIntervals[j].StartTime - }) - mergedIntervals := mergeIntervals(serviceIntervals) - totalTime := uint64(0) - for _, interval := range mergedIntervals { - totalTime += interval.Duration - } - totalTimes[service] = totalTime - } - - return totalTimes -} - -func mergeIntervals(intervals []Interval) []Interval { - if len(intervals) == 0 { - return nil - } - - var merged []Interval - current := intervals[0] - - for i := 1; i < len(intervals); i++ { - next := intervals[i] - if current.StartTime+current.Duration >= next.StartTime { - endTime := max(current.StartTime+current.Duration, next.StartTime+next.Duration) - current.Duration = endTime - current.StartTime - } else { - merged = append(merged, current) - current = next - } - } - // Add the last interval - merged = append(merged, current) - - return merged -} - -func max(a, b uint64) uint64 { - if a > b { - return a - } - return b -} - func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) { response := new(model.GetWaterfallSpansForTraceWithMetadataResponse) - var startTime, endTime, durationNano, totalErrorSpans uint64 + var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64 var spanIdToSpanNodeMap = map[string]*model.Span{} var traceRoots []*model.Span var serviceNameToTotalDurationMap = map[string]uint64{} + var serviceNameIntervalMap = map[string][]tracedetail.Interval{} var useCache bool = true cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache) - cacheStatus, err := r.cacheV2.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false) + cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false) if err != nil { - zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err)) + zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID)) useCache = false } if cacheStatus != cache.RetrieveStatusHit { @@ -1530,6 +1476,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con if err == nil && cacheStatus == cache.RetrieveStatusHit { if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID)) useCache = false } @@ -1541,7 +1488,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap traceRoots = cachedTraceData.TraceRoots - response.TotalSpansCount = cachedTraceData.TotalSpans + totalSpans = cachedTraceData.TotalSpans totalErrorSpans = cachedTraceData.TotalErrorSpans } @@ -1557,30 +1504,28 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con if err == sql.ErrNoRows { return response, nil } - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + zap.L().Error("error in processing trace summary sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) } - response.TotalSpansCount = traceSummary.NumSpans + totalSpans = traceSummary.NumSpans var searchScanResponses []model.SpanItemV2 - query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string , parent_span_id FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) - start := time.Now() + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) + queryStartTime := time.Now() err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) zap.L().Info(query) if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + zap.L().Error("error in processing trace data sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) } - end := time.Now() - zap.L().Debug("GetWaterfallSpansForTraceWithMetadata took: ", zap.Duration("duration", end.Sub(start))) + zap.L().Info("getWaterfallSpansForTraceWithMetadata trace data sql query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) - var serviceNameIntervalMap = map[string][]Interval{} for _, item := range searchScanResponses { ref := []model.OtelSpanRef{} err := json.Unmarshal([]byte(item.References), &ref) if err != nil { - zap.L().Error("Error unmarshalling references", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)} + zap.L().Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", zap.Error(err), zap.String("traceID", traceID)) + return nil, model.BadRequest(fmt.Errorf("getWaterfallSpansForTraceWithMetadata: error unmarshalling references %w", err)) } // merge attributes_number and attributes_bool to attributes_string @@ -1608,17 +1553,20 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con References: ref, Events: item.Events, TagMap: item.Attributes_string, - ParentSpanId: item.ParentSpanId, Children: make([]*model.Span, 0), } + // convert start timestamp to millis jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + // collect the intervals for service for execution time calculation serviceNameIntervalMap[jsonItem.ServiceName] = - append(serviceNameIntervalMap[jsonItem.ServiceName], Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName}) + append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName}) + // append to the span node map spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem + // metadata calculation if startTime == 0 || jsonItem.TimeUnixNano < startTime { startTime = jsonItem.TimeUnixNano } @@ -1633,19 +1581,18 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con } } - serviceNameToTotalDurationMap = calculateServiceTime(serviceNameIntervalMap) - // traverse through the map and append each node to the children array of the parent node - // capture the root nodes as well + // and add the missing spans for _, spanNode := range spanIdToSpanNodeMap { - hasParentRelationship := false + hasParentSpanNode := false for _, reference := range spanNode.References { if reference.RefType == "CHILD_OF" && reference.SpanId != "" { - hasParentRelationship = true + hasParentSpanNode = true + if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { parentNode.Children = append(parentNode.Children, spanNode) } else { - // insert the missing spans + // insert the missing span missingSpan := model.Span{ SpanID: reference.SpanId, TraceID: spanNode.TraceID, @@ -1666,11 +1613,12 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con } } } - if !hasParentRelationship { + if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) { traceRoots = append(traceRoots, spanNode) } } + // sort the trace roots to add missing spans at the right order sort.Slice(traceRoots, func(i, j int) bool { if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { return traceRoots[i].Name < traceRoots[j].Name @@ -1678,6 +1626,8 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano }) + serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap) + traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{ StartTime: startTime, EndTime: endTime, @@ -1689,67 +1639,22 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con TraceRoots: traceRoots, } - err = r.cacheV2.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) + err = r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) if err != nil { - zap.L().Debug("failed to store cache fpr getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) - } - } - - var preOrderTraversal = []*model.Span{} - uncollapsedSpans := req.UncollapsedSpans - - selectedSpanIndex := -1 - for _, rootSpanID := range traceRoots { - if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { - _, _spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, req.SelectedSpanID, uncollapsedSpans, req.IsSelectedSpanIDUnCollapsed) - uncollapsedSpans = append(uncollapsedSpans, _spansFromRootToNode...) - _preOrderTraversal := traverseTraceAndAddRequiredMetadata(rootNode, uncollapsedSpans, 0, true, false, req.SelectedSpanID) - - _selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, req.SelectedSpanID) - if _selectedSpanIndex != -1 { - selectedSpanIndex = _selectedSpanIndex + len(preOrderTraversal) - } - preOrderTraversal = append(preOrderTraversal, _preOrderTraversal...) - - if response.RootServiceName == "" { - response.RootServiceName = rootNode.ServiceName - } - - if response.RootServiceEntryPoint == "" { - response.RootServiceEntryPoint = rootNode.Name - } + zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) } } - // the index of the interested span id shouldn't be -1 as the span should exist - if selectedSpanIndex == -1 && req.SelectedSpanID != "" { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("selected span ID not found in the traversal")} - } - // get the 0.4*[span limit] before the interested span index - startIndex := selectedSpanIndex - 200 - // get the 0.6*[span limit] after the intrested span index - endIndex := selectedSpanIndex + 300 - // adjust the sliding window according to the available left and right spaces. - if startIndex < 0 { - endIndex = endIndex - startIndex - startIndex = 0 - } - if endIndex > len(preOrderTraversal) { - startIndex = startIndex - (endIndex - len(preOrderTraversal)) - endIndex = len(preOrderTraversal) - } - - if startIndex < 0 { - startIndex = 0 - } - selectedSpans := preOrderTraversal[startIndex:endIndex] + selectedSpans, uncollapsedSpans, rootServiceName, rootServiceEntryPoint := tracedetail.GetSelectedSpans(req.UncollapsedSpans, req.SelectedSpanID, traceRoots, spanIdToSpanNodeMap, req.IsSelectedSpanIDUnCollapsed) - // generate the response [ spans , metadata ] response.Spans = selectedSpans response.UncollapsedSpans = uncollapsedSpans response.StartTimestampMillis = startTime response.EndTimestampMillis = endTime + response.TotalSpansCount = totalSpans response.TotalErrorSpansCount = totalErrorSpans + response.RootServiceName = rootServiceName + response.RootServiceEntryPoint = rootServiceEntryPoint response.ServiceNameToTotalDurationMap = serviceNameToTotalDurationMap response.HasMissingSpans = len(traceRoots) > 1 return response, nil @@ -1760,16 +1665,15 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace var startTime, endTime, durationNano uint64 var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{} // map[traceID][level]span - var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{} var selectedSpans = [][]*model.FlamegraphSpan{} var traceRoots []*model.FlamegraphSpan var useCache bool = true // get the trace tree from cache! cachedTraceData := new(model.GetFlamegraphSpansForTraceCache) - cacheStatus, err := r.cacheV2.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false) + cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false) if err != nil { - zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err)) + zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID)) useCache = false } @@ -1780,6 +1684,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace if err == nil && cacheStatus == cache.RetrieveStatusHit { if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID)) useCache = false } @@ -1796,7 +1701,6 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace if !useCache { zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) - // fetch the start, end and number of spans from the summary table, start and end are required for the trace query var traceSummary model.TraceSummary summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) @@ -1804,25 +1708,21 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace if err == sql.ErrNoRows { return trace, nil } - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + zap.L().Error("Error in processing trace summary sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) } - // fetch all the spans belonging to the trace from the main table var searchScanResponses []model.SpanItemV2 - query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name,parent_span_id FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) - start := time.Now() + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) + queryStartTime := time.Now() err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) zap.L().Info(query) if err != nil { zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %w", err)} + return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) } - end := time.Now() - zap.L().Debug("getFlamegraphSpansForTrace took: ", zap.Duration("duration", end.Sub(start))) + zap.L().Info("getFlamegraphSpansForTrace took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) - // create the trace tree based on the spans fetched above - // create a map of [spanId]: spanNode for _, item := range searchScanResponses { ref := []model.OtelSpanRef{} err := json.Unmarshal([]byte(item.References), &ref) @@ -1830,21 +1730,19 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace zap.L().Error("Error unmarshalling references", zap.Error(err)) return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)} } - // create the span node + jsonItem := model.FlamegraphSpan{ SpanID: item.SpanID, TraceID: item.TraceID, ServiceName: item.ServiceName, Name: item.Name, - DurationNano: (item.DurationNano), + DurationNano: item.DurationNano, HasError: item.HasError, - ParentSpanId: item.ParentSpanId, References: ref, Children: make([]*model.FlamegraphSpan, 0), } jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) - // assign the span node to the span map spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem // metadata calculation @@ -1854,17 +1752,18 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime { endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000) } - if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { - durationNano = uint64(jsonItem.DurationNano) + if durationNano == 0 || jsonItem.DurationNano > durationNano { + durationNano = jsonItem.DurationNano } } // traverse through the map and append each node to the children array of the parent node + // and add missing spans for _, spanNode := range spanIdToSpanNodeMap { - hasParentRelationship := false + hasParentSpanNode := false for _, reference := range spanNode.References { if reference.RefType == "CHILD_OF" && reference.SpanId != "" { - hasParentRelationship = true + hasParentSpanNode = true if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists { parentNode.Children = append(parentNode.Children, spanNode) } else { @@ -1885,43 +1784,12 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace } } } - if !hasParentRelationship { + if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) { traceRoots = append(traceRoots, spanNode) } } - sort.Slice(traceRoots, func(i, j int) bool { - if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { - return traceRoots[i].Name < traceRoots[j].Name - } - return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano - }) - - var bfsMapForTrace = map[int64][]*model.FlamegraphSpan{} - for _, rootSpanID := range traceRoots { - if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { - bfsMapForTrace = map[int64][]*model.FlamegraphSpan{} - bfsTraversalForTrace(rootNode, 0, &bfsMapForTrace) - traceIdLevelledFlamegraph[rootSpanID.SpanID] = bfsMapForTrace - } - } - - for _, trace := range traceRoots { - keys := make([]int64, 0, len(traceIdLevelledFlamegraph[trace.SpanID])) - for key := range traceIdLevelledFlamegraph[trace.SpanID] { - keys = append(keys, key) - } - sort.Slice(keys, func(i, j int) bool { - return keys[i] < keys[j] - }) - - for _, level := range keys { - if ok, exists := traceIdLevelledFlamegraph[trace.SpanID][level]; exists { - selectedSpans = append(selectedSpans, ok) - } - } - } - + selectedSpans := tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap) traceCache := model.GetFlamegraphSpansForTraceCache{ StartTime: startTime, EndTime: endTime, @@ -1930,35 +1798,14 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace TraceRoots: traceRoots, } - err = r.cacheV2.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) + err = r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) if err != nil { zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err)) } } - var selectedIndex int64 = 0 - if req.SelectedSpanID != "" { - selectedIndex = findIndexForSelectedSpan(selectedSpans, req.SelectedSpanID) - } - - lowerLimit := selectedIndex - 20 - upperLimit := selectedIndex + 30 - - if lowerLimit < 0 { - upperLimit = upperLimit - lowerLimit - lowerLimit = 0 - } - - if upperLimit > int64(len(selectedSpans)) { - lowerLimit = lowerLimit - (upperLimit - int64(len(selectedSpans))) - upperLimit = int64(len(selectedSpans)) - } - - if lowerLimit < 0 { - lowerLimit = 0 - } - - trace.Spans = selectedSpans[lowerLimit:upperLimit] + selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans) + trace.Spans = selectedSpansForRequest trace.StartTimestampMillis = startTime trace.EndTimestampMillis = endTime return trace, nil diff --git a/pkg/query-service/app/clickhouseReader/utils.go b/pkg/query-service/app/clickhouseReader/utils.go deleted file mode 100644 index 19f3203063d..00000000000 --- a/pkg/query-service/app/clickhouseReader/utils.go +++ /dev/null @@ -1,125 +0,0 @@ -package clickhouseReader - -import ( - "sort" - - "go.signoz.io/signoz/pkg/query-service/model" -) - -func contains(slice []string, item string) bool { - for _, v := range slice { - if v == item { - return true - } - } - return false -} - -func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, uncollapsedSpans []string, isSelectedSpanIDUnCollapsed bool) (bool, []string) { - spansFromRootToNode := []string{} - if node.SpanID == selectedSpanId { - if isSelectedSpanIDUnCollapsed { - spansFromRootToNode = append(spansFromRootToNode, node.SpanID) - } - return true, spansFromRootToNode - } - isPresentInSubtreeForTheNode := false - for _, child := range node.Children { - isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed) - // if the interested node is present in the given subtree then add the span node to uncollapsed node list - if isPresentInThisSubtree { - if !contains(uncollapsedSpans, node.SpanID) { - spansFromRootToNode = append(spansFromRootToNode, node.SpanID) - } - isPresentInSubtreeForTheNode = true - spansFromRootToNode = append(spansFromRootToNode, _spansFromRootToNode...) - } - } - return isPresentInSubtreeForTheNode, spansFromRootToNode -} - -func traverseTraceAndAddRequiredMetadata(span *model.Span, uncollapsedSpans []string, level uint64, isPartOfPreorder bool, hasSibling bool, selectedSpanId string) []*model.Span { - preOrderTraversal := []*model.Span{} - sort.Slice(span.Children, func(i, j int) bool { - if span.Children[i].TimeUnixNano == span.Children[j].TimeUnixNano { - return span.Children[i].Name < span.Children[j].Name - } - return span.Children[i].TimeUnixNano < span.Children[j].TimeUnixNano - }) - span.SubTreeNodeCount = 0 - nodeWithoutChildren := model.Span{ - SpanID: span.SpanID, - TraceID: span.TraceID, - ServiceName: span.ServiceName, - TimeUnixNano: span.TimeUnixNano, - Name: span.Name, - Kind: int32(span.Kind), - DurationNano: span.DurationNano, - HasError: span.HasError, - StatusMessage: span.StatusMessage, - StatusCodeString: span.StatusCodeString, - SpanKind: span.SpanKind, - References: span.References, - Events: span.Events, - TagMap: span.TagMap, - ParentSpanId: span.ParentSpanId, - Children: make([]*model.Span, 0), - HasChildren: len(span.Children) > 0, - Level: level, - HasSiblings: hasSibling, - SubTreeNodeCount: 0, - } - if isPartOfPreorder { - preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) - } - - for index, child := range span.Children { - _childTraversal := traverseTraceAndAddRequiredMetadata(child, uncollapsedSpans, level+1, isPartOfPreorder && contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) - preOrderTraversal = append(preOrderTraversal, _childTraversal...) - nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 - span.SubTreeNodeCount += child.SubTreeNodeCount + 1 - } - - return preOrderTraversal - -} - -func bfsTraversalForTrace(span *model.FlamegraphSpan, level int64, bfsMap *map[int64][]*model.FlamegraphSpan) { - ok, exists := (*bfsMap)[level] - span.Level = level - if exists { - (*bfsMap)[level] = append(ok, span) - } else { - (*bfsMap)[level] = []*model.FlamegraphSpan{span} - } - for _, child := range span.Children { - bfsTraversalForTrace(child, level+1, bfsMap) - } - span.Children = make([]*model.FlamegraphSpan, 0) -} - -func findIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId string) int64 { - var selectedSpanLevel int64 = 0 - - for index, _spans := range spans { - if len(_spans) > 0 && _spans[0].SpanID == selectedSpanId { - selectedSpanLevel = int64(index) - break - } - } - - return selectedSpanLevel -} - -func findIndexForSelectedSpanFromPreOrder(spans []*model.Span, selectedSpanId string) int { - var selectedSpanIndex = -1 - - for index, span := range spans { - if span.SpanID == selectedSpanId { - selectedSpanIndex = index - break - } - } - - return selectedSpanIndex -} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6a764c51de1..e0551440b78 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -547,7 +547,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet) router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost) router.HandleFunc("/api/v2/traces/flamegraph/{traceId}", am.ViewAccess(aH.GetFlamegraphSpansForTrace)).Methods(http.MethodPost) - router.HandleFunc("/api/v2/traces/{traceId}", am.ViewAccess(aH.GetWaterfallSpansForTraceWithMetadata)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/traces/waterfall/{traceId}", am.ViewAccess(aH.GetWaterfallSpansForTraceWithMetadata)).Methods(http.MethodPost) router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) @@ -1782,7 +1782,7 @@ func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWriter, r *http.Request) { traceID := mux.Vars(r)["traceId"] if traceID == "" { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: errors.New("traceID is required")}, nil) + RespondError(w, model.BadRequest(errors.New("traceID is required")), nil) return } @@ -1805,7 +1805,7 @@ func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWrite func (aH *APIHandler) GetFlamegraphSpansForTrace(w http.ResponseWriter, r *http.Request) { traceID := mux.Vars(r)["traceId"] if traceID == "" { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: errors.New("traceID is required")}, nil) + RespondError(w, model.BadRequest(errors.New("traceID is required")), nil) return } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 3ad81b8223a..823d0e48dc5 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -124,20 +124,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { readerReady := make(chan bool) - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } - - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) - if err != nil { - return nil, err - } - fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail) if err != nil { return nil, err @@ -174,6 +160,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + <-readerReady rm, err := makeRulesManager( serverOptions.PromConfigPath, @@ -183,6 +178,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + if err != nil { + return nil, err + } + integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB()) if err != nil { return nil, fmt.Errorf("couldn't create integrations controller: %w", err) diff --git a/pkg/query-service/app/traces/tracedetail/flamegraph.go b/pkg/query-service/app/traces/tracedetail/flamegraph.go new file mode 100644 index 00000000000..02ffd091b54 --- /dev/null +++ b/pkg/query-service/app/traces/tracedetail/flamegraph.go @@ -0,0 +1,118 @@ +package tracedetail + +import ( + "sort" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +var ( + SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50 +) + +func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool { + for _, v := range slice { + if v.SpanID == item.SpanID { + return true + } + } + return false +} + +func BfsTraversalForTrace(span *model.FlamegraphSpan, level int64) map[int64][]*model.FlamegraphSpan { + bfs := map[int64][]*model.FlamegraphSpan{} + bfs[level] = []*model.FlamegraphSpan{span} + + for _, child := range span.Children { + childBfsMap := BfsTraversalForTrace(child, level+1) + for _level, nodes := range childBfsMap { + bfs[_level] = append(bfs[_level], nodes...) + } + } + span.Level = level + span.Children = make([]*model.FlamegraphSpan, 0) + + return bfs +} + +func FindIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId string) int { + var selectedSpanLevel int = 0 + + for index, _spans := range spans { + if len(_spans) > 0 && _spans[0].SpanID == selectedSpanId { + selectedSpanLevel = index + break + } + } + + return selectedSpanLevel +} + +func GetSelectedSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToSpanNodeMap map[string]*model.FlamegraphSpan) [][]*model.FlamegraphSpan { + + var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{} + selectedSpans := [][]*model.FlamegraphSpan{} + + // sort the trace roots to add missing spans at the right order + sort.Slice(traceRoots, func(i, j int) bool { + if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano { + return traceRoots[i].Name < traceRoots[j].Name + } + return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano + }) + + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + bfsMapForTrace := BfsTraversalForTrace(rootNode, 0) + traceIdLevelledFlamegraph[rootSpanID.SpanID] = bfsMapForTrace + } + } + + for _, trace := range traceRoots { + keys := make([]int64, 0, len(traceIdLevelledFlamegraph[trace.SpanID])) + for key := range traceIdLevelledFlamegraph[trace.SpanID] { + keys = append(keys, key) + } + + sort.Slice(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + for _, level := range keys { + if ok, exists := traceIdLevelledFlamegraph[trace.SpanID][level]; exists { + selectedSpans = append(selectedSpans, ok) + } + } + } + + return selectedSpans +} + +func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpans [][]*model.FlamegraphSpan) [][]*model.FlamegraphSpan { + + selectedSpansForRequest := [][]*model.FlamegraphSpan{} + var selectedIndex = 0 + + if selectedSpanID != "" { + selectedIndex = FindIndexForSelectedSpan(selectedSpans, selectedSpanID) + } + + lowerLimit := selectedIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.4) + upperLimit := selectedIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.6) + + if lowerLimit < 0 { + upperLimit = upperLimit - lowerLimit + lowerLimit = 0 + } + + if upperLimit > len(selectedSpans) { + lowerLimit = lowerLimit - (upperLimit - len(selectedSpans)) + upperLimit = (len(selectedSpans)) + } + + if lowerLimit < 0 { + lowerLimit = 0 + } + + return selectedSpansForRequest[lowerLimit:upperLimit] +} diff --git a/pkg/query-service/app/traces/tracedetail/waterfall.go b/pkg/query-service/app/traces/tracedetail/waterfall.go new file mode 100644 index 00000000000..0dbb37b28be --- /dev/null +++ b/pkg/query-service/app/traces/tracedetail/waterfall.go @@ -0,0 +1,228 @@ +package tracedetail + +import ( + "sort" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +var ( + SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL float64 = 500 +) + +type Interval struct { + StartTime uint64 + Duration uint64 + Service string +} + +func max(a, b uint64) uint64 { + if a > b { + return a + } + return b +} + +func mergeIntervals(intervals []Interval) []Interval { + if len(intervals) == 0 { + return nil + } + + var merged []Interval + current := intervals[0] + + for i := 1; i < len(intervals); i++ { + next := intervals[i] + if current.StartTime+current.Duration >= next.StartTime { + endTime := max(current.StartTime+current.Duration, next.StartTime+next.Duration) + current.Duration = endTime - current.StartTime + } else { + merged = append(merged, current) + current = next + } + } + // Add the last interval + merged = append(merged, current) + + return merged +} + +func contains(slice []string, item string) bool { + for _, v := range slice { + if v == item { + return true + } + } + return false +} + +func ContainsWaterfallSpan(slice []*model.Span, item *model.Span) bool { + for _, v := range slice { + if v.SpanID == item.SpanID { + return true + } + } + return false +} + +func findIndexForSelectedSpanFromPreOrder(spans []*model.Span, selectedSpanId string) int { + var selectedSpanIndex = -1 + + for index, span := range spans { + if span.SpanID == selectedSpanId { + selectedSpanIndex = index + break + } + } + + return selectedSpanIndex +} + +func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, uncollapsedSpans []string, isSelectedSpanIDUnCollapsed bool) (bool, []string) { + spansFromRootToNode := []string{} + + if node.SpanID == selectedSpanId { + if isSelectedSpanIDUnCollapsed { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + return true, spansFromRootToNode + } + + isPresentInSubtreeForTheNode := false + for _, child := range node.Children { + isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed) + // if the interested node is present in the given subtree then add the span node to uncollapsed node list + if isPresentInThisSubtree { + if !contains(uncollapsedSpans, node.SpanID) { + spansFromRootToNode = append(spansFromRootToNode, node.SpanID) + } + isPresentInSubtreeForTheNode = true + spansFromRootToNode = append(spansFromRootToNode, _spansFromRootToNode...) + } + } + return isPresentInSubtreeForTheNode, spansFromRootToNode +} + +func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, hasSibling bool, selectedSpanId string) []*model.Span { + preOrderTraversal := []*model.Span{} + + // sort the children to maintain the order across requests + sort.Slice(span.Children, func(i, j int) bool { + if span.Children[i].TimeUnixNano == span.Children[j].TimeUnixNano { + return span.Children[i].Name < span.Children[j].Name + } + return span.Children[i].TimeUnixNano < span.Children[j].TimeUnixNano + }) + + span.SubTreeNodeCount = 0 + nodeWithoutChildren := model.Span{ + SpanID: span.SpanID, + TraceID: span.TraceID, + ServiceName: span.ServiceName, + TimeUnixNano: span.TimeUnixNano, + Name: span.Name, + Kind: int32(span.Kind), + DurationNano: span.DurationNano, + HasError: span.HasError, + StatusMessage: span.StatusMessage, + StatusCodeString: span.StatusCodeString, + SpanKind: span.SpanKind, + References: span.References, + Events: span.Events, + TagMap: span.TagMap, + Children: make([]*model.Span, 0), + HasChildren: len(span.Children) > 0, + Level: level, + HasSiblings: hasSibling, + SubTreeNodeCount: 0, + } + + preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) + + if contains(uncollapsedSpans, span.SpanID) { + for index, child := range span.Children { + _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, index != (len(span.Children)-1), selectedSpanId) + preOrderTraversal = append(preOrderTraversal, _childTraversal...) + nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 + // span.SubTreeNodeCount += child.SubTreeNodeCount + 1 + } + } + + return preOrderTraversal + +} + +func CalculateServiceTime(serviceIntervals map[string][]Interval) map[string]uint64 { + totalTimes := make(map[string]uint64) + + for service, serviceIntervals := range serviceIntervals { + sort.Slice(serviceIntervals, func(i, j int) bool { + return serviceIntervals[i].StartTime < serviceIntervals[j].StartTime + }) + mergedIntervals := mergeIntervals(serviceIntervals) + totalTime := uint64(0) + for _, interval := range mergedIntervals { + totalTime += interval.Duration + } + totalTimes[service] = totalTime + } + + return totalTimes +} + +func GetSelectedSpans(uncollapsedSpans []string, selectedSpanID string, traceRoots []*model.Span, spanIdToSpanNodeMap map[string]*model.Span, isSelectedSpanIDUnCollapsed bool) ([]*model.Span, []string, string, string) { + + var preOrderTraversal = []*model.Span{} + var rootServiceName, rootServiceEntryPoint string + updatedUncollapsedSpans := uncollapsedSpans + + selectedSpanIndex := -1 + for _, rootSpanID := range traceRoots { + if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists { + _, spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, selectedSpanID, updatedUncollapsedSpans, isSelectedSpanIDUnCollapsed) + updatedUncollapsedSpans = append(updatedUncollapsedSpans, spansFromRootToNode...) + + _preOrderTraversal := traverseTrace(rootNode, uncollapsedSpans, 0, false, selectedSpanID) + _selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, selectedSpanID) + + if _selectedSpanIndex != -1 { + selectedSpanIndex = _selectedSpanIndex + len(preOrderTraversal) + } + + preOrderTraversal = append(preOrderTraversal, _preOrderTraversal...) + + if rootServiceName == "" { + rootServiceName = rootNode.ServiceName + } + + if rootServiceEntryPoint == "" { + rootServiceEntryPoint = rootNode.Name + } + } + } + + // if we couldn't find the selectedSpan in the trace then defaulting the selected index to 0 + if selectedSpanIndex == -1 && selectedSpanID != "" { + selectedSpanIndex = 0 + } + + // get the 0.4*[span limit] before the interested span index + startIndex := selectedSpanIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.4) + // get the 0.6*[span limit] after the intrested span index + endIndex := selectedSpanIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.6) + + // adjust the sliding window according to the available left and right spaces. + if startIndex < 0 { + endIndex = endIndex - startIndex + startIndex = 0 + } + if endIndex > len(preOrderTraversal) { + startIndex = startIndex - (endIndex - len(preOrderTraversal)) + endIndex = len(preOrderTraversal) + } + if startIndex < 0 { + startIndex = 0 + } + + return preOrderTraversal[startIndex:endIndex], updatedUncollapsedSpans, rootServiceName, rootServiceEntryPoint +} diff --git a/pkg/query-service/model/cacheable.go b/pkg/query-service/model/cacheable.go new file mode 100644 index 00000000000..b691b2765a3 --- /dev/null +++ b/pkg/query-service/model/cacheable.go @@ -0,0 +1,36 @@ +package model + +import "encoding/json" + +type GetWaterfallSpansForTraceWithMetadataCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + TotalSpans uint64 `json:"totalSpans"` + TotalErrorSpans uint64 `json:"totalErrorSpans"` + ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` + SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"` + TraceRoots []*Span `json:"traceRoots"` +} + +func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetWaterfallSpansForTraceWithMetadataCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} + +type GetFlamegraphSpansForTraceCache struct { + StartTime uint64 `json:"startTime"` + EndTime uint64 `json:"endTime"` + DurationNano uint64 `json:"durationNano"` + SelectedSpans [][]*FlamegraphSpan `json:"selectedSpans"` + TraceRoots []*FlamegraphSpan `json:"traceRoots"` +} + +func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) { + return json.Marshal(c) +} +func (c *GetFlamegraphSpansForTraceCache) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, c) +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 7ef1c1d9484..650571dcf08 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -118,6 +118,13 @@ func ForbiddenError(err error) *ApiError { } } +func ExecutionError(err error) *ApiError { + return &ApiError{ + Typ: ErrorExec, + Err: err, + } +} + func WrapApiError(err *ApiError, msg string) *ApiError { return &ApiError{ Typ: err.Type(), @@ -274,7 +281,6 @@ type Span struct { DurationNano uint64 `json:"durationNano"` SpanID string `json:"spanId"` RootSpanID string `json:"rootSpanId"` - ParentSpanId string `json:"parentSpanId"` TraceID string `json:"traceId"` HasError bool `json:"hasError"` Kind int32 `json:"kind"` @@ -300,7 +306,6 @@ type FlamegraphSpan struct { TimeUnixNano uint64 `json:"timestamp"` DurationNano uint64 `json:"durationNano"` SpanID string `json:"spanId"` - ParentSpanId string `json:"parentSpanId"` TraceID string `json:"traceId"` HasError bool `json:"hasError"` ServiceName string `json:"serviceName"` @@ -310,39 +315,6 @@ type FlamegraphSpan struct { Children []*FlamegraphSpan `json:"children"` } -type GetWaterfallSpansForTraceWithMetadataCache struct { - StartTime uint64 `json:"startTime"` - EndTime uint64 `json:"endTime"` - DurationNano uint64 `json:"durationNano"` - TotalSpans uint64 `json:"totalSpans"` - TotalErrorSpans uint64 `json:"totalErrorSpans"` - ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"` - SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"` - TraceRoots []*Span `json:"traceRoots"` -} - -func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) { - return json.Marshal(c) -} -func (c *GetWaterfallSpansForTraceWithMetadataCache) UnmarshalBinary(data []byte) error { - return json.Unmarshal(data, c) -} - -type GetFlamegraphSpansForTraceCache struct { - StartTime uint64 `json:"startTime"` - EndTime uint64 `json:"endTime"` - DurationNano uint64 `json:"durationNano"` - SelectedSpans [][]*FlamegraphSpan `json:"selectedSpans"` - TraceRoots []*FlamegraphSpan `json:"traceRoots"` -} - -func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) { - return json.Marshal(c) -} -func (c *GetFlamegraphSpansForTraceCache) UnmarshalBinary(data []byte) error { - return json.Unmarshal(data, c) -} - type GetWaterfallSpansForTraceWithMetadataResponse struct { StartTimestampMillis uint64 `json:"startTimestampMillis"` EndTimestampMillis uint64 `json:"endTimestampMillis"` From a1d3d6f5036653704e5ec65506da9149008aee4c Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 23 Jan 2025 18:18:45 +0530 Subject: [PATCH 3/5] feat(trace-detail): minor bug fixes --- ee/query-service/app/db/reader.go | 1 - .../app/clickhouseReader/reader.go | 11 +++++++++- pkg/query-service/app/server.go | 2 +- .../app/traces/tracedetail/flamegraph.go | 6 ++---- .../app/traces/tracedetail/waterfall.go | 21 ++++++++++--------- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index 071ec70d199..afc6f3af69a 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -18,7 +18,6 @@ type ClickhouseReader struct { *basechr.ClickHouseReader } -// dummy func NewDataConnector( localDB *sqlx.DB, promConfigPath string, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2dbf6e16bb8..be5d084458b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1520,6 +1520,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con } zap.L().Info("getWaterfallSpansForTraceWithMetadata trace data sql query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) + processingBeforeCache := time.Now() for _, item := range searchScanResponses { ref := []model.OtelSpanRef{} err := json.Unmarshal([]byte(item.References), &ref) @@ -1639,13 +1640,16 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con TraceRoots: traceRoots, } + zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) err = r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) if err != nil { zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) } } + processingPostCache := time.Now() selectedSpans, uncollapsedSpans, rootServiceName, rootServiceEntryPoint := tracedetail.GetSelectedSpans(req.UncollapsedSpans, req.SelectedSpanID, traceRoots, spanIdToSpanNodeMap, req.IsSelectedSpanIDUnCollapsed) + zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID)) response.Spans = selectedSpans response.UncollapsedSpans = uncollapsedSpans @@ -1723,6 +1727,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace } zap.L().Info("getFlamegraphSpansForTrace took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) + processingBeforeCache := time.Now() for _, item := range searchScanResponses { ref := []model.OtelSpanRef{} err := json.Unmarshal([]byte(item.References), &ref) @@ -1789,7 +1794,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace } } - selectedSpans := tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap) + selectedSpans = tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap) traceCache := model.GetFlamegraphSpansForTraceCache{ StartTime: startTime, EndTime: endTime, @@ -1798,13 +1803,17 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace TraceRoots: traceRoots, } + zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) err = r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) if err != nil { zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err)) } } + processingPostCache := time.Now() selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans) + zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID)) + trace.Spans = selectedSpansForRequest trace.StartTimestampMillis = startTime trace.EndTimestampMillis = endTime diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 823d0e48dc5..e645913f2dd 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -144,7 +144,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, fluxIntervalForTraceDetail, - nil, + serverOptions.SigNoz.Cache, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader diff --git a/pkg/query-service/app/traces/tracedetail/flamegraph.go b/pkg/query-service/app/traces/tracedetail/flamegraph.go index 02ffd091b54..b9412ebf8ce 100644 --- a/pkg/query-service/app/traces/tracedetail/flamegraph.go +++ b/pkg/query-service/app/traces/tracedetail/flamegraph.go @@ -89,8 +89,6 @@ func GetSelectedSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToS } func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpans [][]*model.FlamegraphSpan) [][]*model.FlamegraphSpan { - - selectedSpansForRequest := [][]*model.FlamegraphSpan{} var selectedIndex = 0 if selectedSpanID != "" { @@ -107,12 +105,12 @@ func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpan if upperLimit > len(selectedSpans) { lowerLimit = lowerLimit - (upperLimit - len(selectedSpans)) - upperLimit = (len(selectedSpans)) + upperLimit = len(selectedSpans) } if lowerLimit < 0 { lowerLimit = 0 } - return selectedSpansForRequest[lowerLimit:upperLimit] + return selectedSpans[lowerLimit:upperLimit] } diff --git a/pkg/query-service/app/traces/tracedetail/waterfall.go b/pkg/query-service/app/traces/tracedetail/waterfall.go index 0dbb37b28be..e483763e4a6 100644 --- a/pkg/query-service/app/traces/tracedetail/waterfall.go +++ b/pkg/query-service/app/traces/tracedetail/waterfall.go @@ -103,7 +103,7 @@ func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, un return isPresentInSubtreeForTheNode, spansFromRootToNode } -func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, hasSibling bool, selectedSpanId string) []*model.Span { +func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, isPartOfPreOrder bool, hasSibling bool, selectedSpanId string) []*model.Span { preOrderTraversal := []*model.Span{} // sort the children to maintain the order across requests @@ -137,17 +137,18 @@ func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, ha SubTreeNodeCount: 0, } - preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) + if isPartOfPreOrder { + preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren) + } - if contains(uncollapsedSpans, span.SpanID) { - for index, child := range span.Children { - _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, index != (len(span.Children)-1), selectedSpanId) - preOrderTraversal = append(preOrderTraversal, _childTraversal...) - nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 - // span.SubTreeNodeCount += child.SubTreeNodeCount + 1 - } + for index, child := range span.Children { + _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, isPartOfPreOrder && contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) + preOrderTraversal = append(preOrderTraversal, _childTraversal...) + nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 + span.SubTreeNodeCount += child.SubTreeNodeCount + 1 } + nodeWithoutChildren.SubTreeNodeCount += 1 return preOrderTraversal } @@ -182,7 +183,7 @@ func GetSelectedSpans(uncollapsedSpans []string, selectedSpanID string, traceRoo _, spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, selectedSpanID, updatedUncollapsedSpans, isSelectedSpanIDUnCollapsed) updatedUncollapsedSpans = append(updatedUncollapsedSpans, spansFromRootToNode...) - _preOrderTraversal := traverseTrace(rootNode, uncollapsedSpans, 0, false, selectedSpanID) + _preOrderTraversal := traverseTrace(rootNode, updatedUncollapsedSpans, 0, true, false, selectedSpanID) _selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, selectedSpanID) if _selectedSpanIndex != -1 { From b33aa741977803326d185beb5a9aec83e1d1b416 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 23 Jan 2025 22:22:17 +0530 Subject: [PATCH 4/5] feat(trace-detail): address review comments --- ee/query-service/main.go | 2 +- .../app/clickhouseReader/reader.go | 191 +++++++++--------- .../app/traces/tracedetail/waterfall.go | 21 +- 3 files changed, 100 insertions(+), 114 deletions(-) diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 9fdb4f17c41..52a34c1b739 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -120,7 +120,7 @@ func main() { flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") - flag.StringVar(&fluxInterval, "flux-interval", "0m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") + flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)") flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index be5d084458b..5556c31f816 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1454,71 +1454,84 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc return &searchSpansResult, nil } -func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) { - response := new(model.GetWaterfallSpansForTraceWithMetadataResponse) - var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64 - var spanIdToSpanNodeMap = map[string]*model.Span{} - var traceRoots []*model.Span - var serviceNameToTotalDurationMap = map[string]uint64{} - var serviceNameIntervalMap = map[string][]tracedetail.Interval{} - var useCache bool = true +func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string) ([]model.SpanItemV2, *model.ApiError) { + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return []model.SpanItemV2{}, nil + } + zap.L().Error("Error in processing trace summary sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) + } + + var searchScanResponses []model.SpanItemV2 + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) + queryStartTime := time.Now() + err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + zap.L().Info(query) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) + } + zap.L().Info("trace details query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) + + return searchScanResponses, nil +} +func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) { cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache) cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false) if err != nil { zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID)) - useCache = false + return nil, err } + if cacheStatus != cache.RetrieveStatusHit { - useCache = false + return nil, errors.Errorf("cache status for getWaterfallSpansForTraceWithMetadata : %s, traceID: %s", cacheStatus, traceID) } - if err == nil && cacheStatus == cache.RetrieveStatusHit { + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID)) + return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache, traceID: %s", traceID) + } - if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { - zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID)) - useCache = false - } + zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) + return cachedTraceData, nil +} - if useCache { - zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) - startTime = cachedTraceData.StartTime - endTime = cachedTraceData.EndTime - durationNano = cachedTraceData.DurationNano - spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap - serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap - traceRoots = cachedTraceData.TraceRoots - totalSpans = cachedTraceData.TotalSpans - totalErrorSpans = cachedTraceData.TotalErrorSpans - } +func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) { + response := new(model.GetWaterfallSpansForTraceWithMetadataResponse) + var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64 + var spanIdToSpanNodeMap = map[string]*model.Span{} + var traceRoots []*model.Span + var serviceNameToTotalDurationMap = map[string]uint64{} + var serviceNameIntervalMap = map[string][]tracedetail.Interval{} + cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID) + if err == nil { + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap + serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap + traceRoots = cachedTraceData.TraceRoots + totalSpans = cachedTraceData.TotalSpans + totalErrorSpans = cachedTraceData.TotalErrorSpans } - if !useCache { + if err != nil { zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) - var traceSummary model.TraceSummary - summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) - err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID) if err != nil { - if err == sql.ErrNoRows { - return response, nil - } - zap.L().Error("error in processing trace summary sql query", zap.Error(err)) - return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) + return nil, err } - totalSpans = traceSummary.NumSpans - - var searchScanResponses []model.SpanItemV2 - query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) - queryStartTime := time.Now() - err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) - zap.L().Info(query) - if err != nil { - zap.L().Error("error in processing trace data sql query", zap.Error(err)) - return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) + if len(searchScanResponses) == 0 { + return response, nil } - zap.L().Info("getWaterfallSpansForTraceWithMetadata trace data sql query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) + totalSpans = uint64(len(searchScanResponses)) processingBeforeCache := time.Now() for _, item := range searchScanResponses { @@ -1633,7 +1646,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con StartTime: startTime, EndTime: endTime, DurationNano: durationNano, - TotalSpans: traceSummary.NumSpans, + TotalSpans: totalSpans, TotalErrorSpans: totalErrorSpans, SpanIdToSpanNodeMap: spanIdToSpanNodeMap, ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap, @@ -1641,8 +1654,8 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con } zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) - err = r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) - if err != nil { + cacheErr := r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5) + if cacheErr != nil { zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err)) } } @@ -1664,68 +1677,56 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con return response, nil } -func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) { - trace := new(model.GetFlamegraphSpansForTraceResponse) - var startTime, endTime, durationNano uint64 - var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{} - // map[traceID][level]span - var selectedSpans = [][]*model.FlamegraphSpan{} - var traceRoots []*model.FlamegraphSpan - var useCache bool = true - - // get the trace tree from cache! +func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) { cachedTraceData := new(model.GetFlamegraphSpansForTraceCache) cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false) if err != nil { zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID)) - useCache = false + return nil, err } if cacheStatus != cache.RetrieveStatusHit { - useCache = false + return nil, errors.Errorf("cache status for getFlamegraphSpansForTrace : %s, traceID: %s", cacheStatus, traceID) } - if err == nil && cacheStatus == cache.RetrieveStatusHit { + if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { + zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID)) + return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache, traceID: %s", traceID) + } - if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail { - zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID)) - useCache = false - } + zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) + return cachedTraceData, nil +} - if useCache { - zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) - startTime = cachedTraceData.StartTime - endTime = cachedTraceData.EndTime - durationNano = cachedTraceData.DurationNano - selectedSpans = cachedTraceData.SelectedSpans - traceRoots = cachedTraceData.TraceRoots - } +func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) { + trace := new(model.GetFlamegraphSpansForTraceResponse) + var startTime, endTime, durationNano uint64 + var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{} + // map[traceID][level]span + var selectedSpans = [][]*model.FlamegraphSpan{} + var traceRoots []*model.FlamegraphSpan + + // get the trace tree from cache! + cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, traceID) + + if err == nil { + startTime = cachedTraceData.StartTime + endTime = cachedTraceData.EndTime + durationNano = cachedTraceData.DurationNano + selectedSpans = cachedTraceData.SelectedSpans + traceRoots = cachedTraceData.TraceRoots } - if !useCache { + if err != nil { zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) - var traceSummary model.TraceSummary - summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) - err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID) if err != nil { - if err == sql.ErrNoRows { - return trace, nil - } - zap.L().Error("Error in processing trace summary sql query", zap.Error(err)) - return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err)) + return nil, err } - - var searchScanResponses []model.SpanItemV2 - query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) - queryStartTime := time.Now() - err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) - zap.L().Info(query) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) + if len(searchScanResponses) == 0 { + return trace, nil } - zap.L().Info("getFlamegraphSpansForTrace took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID)) processingBeforeCache := time.Now() for _, item := range searchScanResponses { @@ -1804,8 +1805,8 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace } zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID)) - err = r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) - if err != nil { + cacheErr := r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5) + if cacheErr != nil { zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err)) } } diff --git a/pkg/query-service/app/traces/tracedetail/waterfall.go b/pkg/query-service/app/traces/tracedetail/waterfall.go index e483763e4a6..36aa88ef4fd 100644 --- a/pkg/query-service/app/traces/tracedetail/waterfall.go +++ b/pkg/query-service/app/traces/tracedetail/waterfall.go @@ -1,6 +1,7 @@ package tracedetail import ( + "slices" "sort" "go.signoz.io/signoz/pkg/query-service/model" @@ -16,13 +17,6 @@ type Interval struct { Service string } -func max(a, b uint64) uint64 { - if a > b { - return a - } - return b -} - func mergeIntervals(intervals []Interval) []Interval { if len(intervals) == 0 { return nil @@ -47,15 +41,6 @@ func mergeIntervals(intervals []Interval) []Interval { return merged } -func contains(slice []string, item string) bool { - for _, v := range slice { - if v == item { - return true - } - } - return false -} - func ContainsWaterfallSpan(slice []*model.Span, item *model.Span) bool { for _, v := range slice { if v.SpanID == item.SpanID { @@ -93,7 +78,7 @@ func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, un isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed) // if the interested node is present in the given subtree then add the span node to uncollapsed node list if isPresentInThisSubtree { - if !contains(uncollapsedSpans, node.SpanID) { + if !slices.Contains(uncollapsedSpans, node.SpanID) { spansFromRootToNode = append(spansFromRootToNode, node.SpanID) } isPresentInSubtreeForTheNode = true @@ -142,7 +127,7 @@ func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, is } for index, child := range span.Children { - _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, isPartOfPreOrder && contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) + _childTraversal := traverseTrace(child, uncollapsedSpans, level+1, isPartOfPreOrder && slices.Contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId) preOrderTraversal = append(preOrderTraversal, _childTraversal...) nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1 span.SubTreeNodeCount += child.SubTreeNodeCount + 1 From 52ad95e7668b67ba2d1096514ff8bd204681d031 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 23 Jan 2025 22:34:38 +0530 Subject: [PATCH 5/5] feat(trace-detail): address review comments --- pkg/query-service/app/clickhouseReader/reader.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 5556c31f816..89f8c0536de 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1454,7 +1454,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc return &searchSpansResult, nil } -func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string) ([]model.SpanItemV2, *model.ApiError) { +func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string, traceDetailsQuery string) ([]model.SpanItemV2, *model.ApiError) { var traceSummary model.TraceSummary summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) @@ -1467,10 +1467,9 @@ func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string) } var searchScanResponses []model.SpanItemV2 - query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName) queryStartTime := time.Now() - err = r.db.Select(ctx, &searchScanResponses, query, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) - zap.L().Info(query) + err = r.db.Select(ctx, &searchScanResponses, traceDetailsQuery, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + zap.L().Info(traceDetailsQuery) if err != nil { zap.L().Error("Error in processing sql query", zap.Error(err)) return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err)) @@ -1524,7 +1523,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con if err != nil { zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID)) - searchScanResponses, err := r.GetSpansForTrace(ctx, traceID) + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName)) if err != nil { return nil, err } @@ -1720,7 +1719,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace if err != nil { zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID)) - searchScanResponses, err := r.GetSpansForTrace(ctx, traceID) + searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName)) if err != nil { return nil, err }