diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index cc204f85438..f3593342cbe 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -32,6 +32,9 @@ extensions: traces_archive: another_store ui: config_file: ./cmd/jaeger/config-ui.json + # The maximum duration that is considered for clock skew adjustments. + # Defaults to 0 seconds, which means it's disabled. + max_clock_skew_adjust: 0s jaeger_storage: backends: diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 063da1fb8b5..82aeeeeb611 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -95,8 +95,12 @@ func (s *server) Start(ctx context.Context, host component.Host) error { return fmt.Errorf("cannot create dependencies reader: %w", err) } - var opts querysvc.QueryServiceOptions - var v2opts v2querysvc.QueryServiceOptions + opts := querysvc.QueryServiceOptions{ + MaxClockSkewAdjust: s.config.MaxClockSkewAdjust, + } + v2opts := v2querysvc.QueryServiceOptions{ + MaxClockSkewAdjust: s.config.MaxClockSkewAdjust, + } if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil { return err } diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 2110c9f2a70..0dbca1f7008 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -23,9 +23,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - v2adjuster "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc" - "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -47,6 +45,10 @@ const ( queryEnableTracing = "query.enable-tracing" ) +const ( + defaultMaxClockSkewAdjust = 0 * time.Second +) + var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{ Prefix: "query.grpc", } @@ -94,7 +96,7 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.Bool(queryLogStaticAssetsAccess, false, "Log when static assets are accessed (for debugging)") flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format") flagSet.Bool(queryTokenPropagation, false, "Allow propagation of bearer token to be used by storage plugins") - flagSet.Duration(queryMaxClockSkewAdjust, 0, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments") + flagSet.Duration(queryMaxClockSkewAdjust, defaultMaxClockSkewAdjust, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments") flagSet.Bool(queryEnableTracing, false, "Enables emitting jaeger-query traces") tlsGRPCFlagsConfig.AddFlags(flagSet) tlsHTTPFlagsConfig.AddFlags(flagSet) @@ -145,7 +147,9 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions( initArchiveStorageFn InitArchiveStorageFn, logger *zap.Logger, ) *querysvc.QueryServiceOptions { - opts := &querysvc.QueryServiceOptions{} + opts := &querysvc.QueryServiceOptions{ + MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust, + } ar, aw := initArchiveStorageFn(logger) if ar != nil && aw != nil { opts.ArchiveSpanReader = ar @@ -154,13 +158,13 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions( logger.Info("Archive storage not initialized") } - opts.Adjuster = adjuster.Sequence(querysvc.StandardAdjusters(qOpts.MaxClockSkewAdjust)...) - return opts } func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(initArchiveStorageFn InitArchiveStorageFn, logger *zap.Logger) *v2querysvc.QueryServiceOptions { - opts := &v2querysvc.QueryServiceOptions{} + opts := &v2querysvc.QueryServiceOptions{ + MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust, + } ar, aw := initArchiveStorageFn(logger) if ar != nil && aw != nil { @@ -170,8 +174,6 @@ func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(initArchiveStorageFn InitA logger.Info("Archive storage not initialized") } - opts.Adjuster = v2adjuster.Sequence(v2adjuster.StandardAdjusters(qOpts.MaxClockSkewAdjust)...) - return opts } @@ -206,6 +208,7 @@ func mapHTTPHeaderToOTELHeaders(h http.Header) map[string]configopaque.String { func DefaultQueryOptions() QueryOptions { return QueryOptions{ + MaxClockSkewAdjust: defaultMaxClockSkewAdjust, HTTP: confighttp.ServerConfig{ Endpoint: ports.PortToHostPort(ports.QueryHTTP), }, diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 86f07f367fd..59eb1c7dd5c 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -102,9 +102,9 @@ func TestBuildQueryServiceOptions(t *testing.T) { qSvcOpts := qOpts.BuildQueryServiceOptions(initializedFn, zap.NewNop()) assert.NotNil(t, qSvcOpts) - assert.NotNil(t, qSvcOpts.Adjuster) assert.NotNil(t, qSvcOpts.ArchiveSpanReader) assert.NotNil(t, qSvcOpts.ArchiveSpanWriter) + assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust) } func TestBuildQueryServiceOptions_NoArchiveStorage(t *testing.T) { @@ -116,9 +116,9 @@ func TestBuildQueryServiceOptions_NoArchiveStorage(t *testing.T) { logger, logBuf := testutils.NewLogger() qSvcOpts := qOpts.BuildQueryServiceOptions(uninitializedFn, logger) assert.NotNil(t, qSvcOpts) - assert.NotNil(t, qSvcOpts.Adjuster) assert.Nil(t, qSvcOpts.ArchiveSpanReader) assert.Nil(t, qSvcOpts.ArchiveSpanWriter) + assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust) require.Contains(t, logBuf.String(), "Archive storage not initialized") } @@ -132,9 +132,9 @@ func TestBuildQueryServiceOptionsV2(t *testing.T) { qSvcOpts := qOpts.BuildQueryServiceOptionsV2(initializedFn, zap.NewNop()) assert.NotNil(t, qSvcOpts) - assert.NotNil(t, qSvcOpts.Adjuster) assert.NotNil(t, qSvcOpts.ArchiveTraceReader) assert.NotNil(t, qSvcOpts.ArchiveTraceWriter) + assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust) } func TestBuildQueryServiceOptionsV2_NoArchiveStorage(t *testing.T) { diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 914a302e30b..a5f2c6a15a7 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -18,15 +18,11 @@ import ( var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") -const ( - defaultMaxClockSkewAdjust = time.Second -) - // QueryServiceOptions has optional members of QueryService type QueryServiceOptions struct { - ArchiveSpanReader spanstore.Reader - ArchiveSpanWriter spanstore.Writer - Adjuster adjuster.Adjuster + ArchiveSpanReader spanstore.Reader + ArchiveSpanWriter spanstore.Writer + MaxClockSkewAdjust time.Duration } // StorageCapabilities is a feature flag for query service @@ -41,6 +37,7 @@ type StorageCapabilities struct { type QueryService struct { spanReader spanstore.Reader dependencyReader depstore.Reader + adjuster adjuster.Adjuster options QueryServiceOptions } @@ -67,12 +64,10 @@ func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Re qsvc := &QueryService{ spanReader: spanReader, dependencyReader: dependencyReader, + adjuster: adjuster.Sequence(StandardAdjusters(options.MaxClockSkewAdjust)...), options: options, } - if qsvc.options.Adjuster == nil { - qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjust)...) - } return qsvc } @@ -143,7 +138,7 @@ func (qs QueryService) ArchiveTrace(ctx context.Context, query spanstore.GetTrac // Adjust applies adjusters to the trace. func (qs QueryService) adjust(trace *model.Trace) { - qs.options.Adjuster.Adjust(trace) + qs.adjuster.Adjust(trace) } // GetDependencies implements dependencystore.Reader.GetDependencies diff --git a/cmd/query/app/querysvc/v2/querysvc/service.go b/cmd/query/app/querysvc/v2/querysvc/service.go index cf3f93713d7..e23f0460557 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service.go +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -21,19 +21,14 @@ import ( var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") -const ( - defaultMaxClockSkewAdjust = time.Second -) - // QueryServiceOptions holds the configuration options for the query service. type QueryServiceOptions struct { // ArchiveTraceReader is used to read archived traces from the storage. ArchiveTraceReader tracestore.Reader // ArchiveTraceWriter is used to write traces to the archive storage. ArchiveTraceWriter tracestore.Writer - // Adjuster is used to adjust traces before they are returned to the client. - // If not set, the default adjuster will be used. - Adjuster adjuster.Adjuster + // MaxClockSkewAdjust is the maximum duration by which to adjust a span. + MaxClockSkewAdjust time.Duration } // StorageCapabilities is a feature flag for query service @@ -48,6 +43,7 @@ type StorageCapabilities struct { type QueryService struct { traceReader tracestore.Reader dependencyReader depstore.Reader + adjuster adjuster.Adjuster options QueryServiceOptions } @@ -76,13 +72,12 @@ func NewQueryService( qsvc := &QueryService{ traceReader: traceReader, dependencyReader: dependencyReader, - options: options, + adjuster: adjuster.Sequence( + adjuster.StandardAdjusters(options.MaxClockSkewAdjust)..., + ), + options: options, } - if qsvc.options.Adjuster == nil { - qsvc.options.Adjuster = adjuster.Sequence( - adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) - } return qsvc } @@ -191,7 +186,7 @@ func (qs QueryService) receiveTraces( } for _, trace := range traces { if !rawTraces { - qs.options.Adjuster.Adjust(trace) + qs.adjuster.Adjust(trace) } jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { foundTraceIDs[span.TraceID()] = struct{}{} diff --git a/cmd/query/app/querysvc/v2/querysvc/service_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go index 51850c1b331..3152aa33f45 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service_test.go +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -59,12 +59,6 @@ func withArchiveTraceWriter() testOption { } } -func withAdjuster(adj adjuster.Adjuster) testOption { - return func(_ *testQueryService, options *QueryServiceOptions) { - options.Adjuster = adj - } -} - func initializeTestService(opts ...testOption) *testQueryService { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} @@ -483,7 +477,8 @@ func TestFindTraces_WithRawTraces_PerformsAggregation(t *testing.T) { adjustCalls++ }) - tqs := initializeTestService(withAdjuster(adj)) + tqs := initializeTestService() + tqs.queryService.adjuster = adj duration, err := time.ParseDuration("20ms") require.NoError(t, err) now := time.Now()