Skip to content

Commit

Permalink
Extend distributor trace logger with optional features to include spa…
Browse files Browse the repository at this point in the history
…n attributes and filter by error status (grafana#1465)

* Extend ditributor trace logger with optional feature to include the span attributes and to filter by status error

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* format dependencies in test file

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* update CHANGELOG.md

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* apply lint

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* update docs, update e2e tests

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* make LogReceivedTraces backward compatible, update docs

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* fix config file

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* add optional log_received_spans, update tests to test resources attributes, update docs

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* update integrations tests to use new log_received_spans

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* test LogReceivedTraces flow, remove blank space in doc

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* use optional spans parameter in makeInstrumentationLibrary to simplify tests

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* use assert.EqualValues instead of reflect.DeepEqual

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* fix typo in test

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* * Split logSpanWithAttribute into smaller functions.
* Update CHANGELOG with deprecated flag

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

* Update modules/distributor/config.go

Co-authored-by: Koenraad Verheyden <[email protected]>

* Update docs/tempo/website/configuration/_index.md

Co-authored-by: Kim Nylander <[email protected]>

* * Rename include_attributes to include_all_attributes
* Add warning log when log_received_traces flag is use

Signed-off-by: Fausto David Suarez Rosario <[email protected]>

Co-authored-by: Koenraad Verheyden <[email protected]>
Co-authored-by: Kim Nylander <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2022
1 parent 6c35e24 commit da138b1
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased

* [FEATURE] Mark `log_received_traces` as deprecated. New flag is `log_received_spans`.
Extend distributor spans logger with optional features to include span attributes and a filter by error status. [#1465](https://github.com/grafana/tempo/pull/1465) (@faustodavid)
* [CHANGE] metrics-generator: Changed added metric label `instance` to `__metrics_gen_instance` to reduce collisions with custom dimensions. [#1439](https://github.com/grafana/tempo/pull/1439) (@joe-elliott)
* [CHANGE] Don't enforce `max_bytes_per_tag_values_query` when set to 0. [#1447](https://github.com/grafana/tempo/pull/1447) (@joe-elliott)
* [CHANGE] Add new querier service in deployment jsonnet to serve `/status` endpoint. [#1474](https://github.com/grafana/tempo/pull/1474) (@annanay25)
Expand Down
4 changes: 4 additions & 0 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (c *Config) CheckConfig() {
if c.StorageConfig.Trace.BlocklistPollConcurrency == 0 {
level.Warn(log.Logger).Log("msg", "c.StorageConfig.Trace.BlocklistPollConcurrency must be greater than zero. Using default.", "default", tempodb.DefaultBlocklistPollConcurrency)
}

if c.Distributor.LogReceivedTraces {
level.Warn(log.Logger).Log("msg", "c.Distributor.LogReceivedTraces is deprecated. The new flag is c.Distributor.log_received_spans.enabled")
}
}

func (c *Config) Describe(ch chan<- *prometheus.Desc) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *App) initOverrides() (services.Service, error) {

func (t *App) initDistributor() (services.Service, error) {
// todo: make ingester client a module instead of passing the config everywhere
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.cfg.GeneratorClient, t.generatorRing, t.overrides, t.TracesConsumerMiddleware, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, t.cfg.MetricsGeneratorEnabled, prometheus.DefaultRegisterer)
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.cfg.GeneratorClient, t.generatorRing, t.overrides, t.TracesConsumerMiddleware, log.Logger, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, t.cfg.MetricsGeneratorEnabled, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create distributor %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,18 @@ distributor:

# Optional.
# Enable to log every received trace id to help debug ingestion
[log_received_traces: <bool>]
# WARNING: Deprecated. Use log_received_spans instead.
[log_received_traces: <boolean> | default = false]

# Optional.
# disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs
log_received_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
# note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
[extend_writes: <bool>]

Expand Down
3 changes: 2 additions & 1 deletion docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ distributor:
instance_addr: ""
receivers: {}
override_ring_key: distributor
log_received_traces: false
log_received_spans:
enabled: false
extend_writes: true
search_tags_deny_list: []
ingester_client:
Expand Down
3 changes: 2 additions & 1 deletion integration/e2e/config-all-in-one-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ distributor:
protocols:
grpc:
zipkin:
log_received_traces: true
log_received_spans:
enabled: true

ingester:
lifecycler:
Expand Down
3 changes: 2 additions & 1 deletion integration/e2e/config-metrics-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ distributor:
jaeger:
protocols:
grpc:
log_received_traces: true
log_received_spans:
enabled: true

ingester:
lifecycler:
Expand Down
15 changes: 13 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/grafana/dskit/flagext"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/tempo/pkg/util"
)

var defaultReceivers = map[string]interface{}{
Expand All @@ -31,7 +32,8 @@ type Config struct {
// otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedTraces bool `yaml:"log_received_traces"`
LogReceivedTraces bool `yaml:"log_received_traces"` // Deprecated
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`

// disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
// note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
Expand All @@ -43,6 +45,12 @@ type Config struct {
factory func(addr string) (ring_client.PoolClient, error) `yaml:"-"`
}

type LogReceivedSpansConfig struct {
Enabled bool `yaml:"enabled"`
IncludeAllAttributes bool `yaml:"include_all_attributes"`
FilterByStatusError bool `yaml:"filter_by_status_error"`
}

// RegisterFlagsAndApplyDefaults registers flags and applies defaults
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
flagext.DefaultValues(&cfg.DistributorRing)
Expand All @@ -52,5 +60,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

f.BoolVar(&cfg.LogReceivedTraces, prefix+".log-received-traces", false, "Enable to log every received trace id to help debug ingestion.")
f.BoolVar(&cfg.LogReceivedTraces, util.PrefixConfig(prefix, "log-received-traces"), false, "Enable to log every received trace id to help debug ingestion.")
f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
}
75 changes: 65 additions & 10 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/util/strutil"
"github.com/segmentio/fasthash/fnv1a"

"github.com/pkg/errors"
Expand All @@ -34,7 +36,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
tempo_util "github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/validation"
)

Expand Down Expand Up @@ -137,10 +139,12 @@ type Distributor struct {
// Manager for subservices
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

logger log.Logger
}

// New a distributor creates.
func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRing, generatorClientCfg generator_client.Config, generatorsRing ring.ReadRing, o *overrides.Overrides, middleware receiver.Middleware, loggingLevel logging.Level, searchEnabled bool, metricsGeneratorEnabled bool, reg prometheus.Registerer) (*Distributor, error) {
func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRing, generatorClientCfg generator_client.Config, generatorsRing ring.ReadRing, o *overrides.Overrides, middleware receiver.Middleware, logger log.Logger, loggingLevel logging.Level, searchEnabled bool, metricsGeneratorEnabled bool, reg prometheus.Registerer) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
Expand All @@ -156,14 +160,14 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi

if o.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy {
lifecyclerCfg := cfg.DistributorRing.ToLifecyclerConfig()
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}
subservices = append(subservices, lifecycler)
ingestionRateStrategy = newGlobalIngestionRateStrategy(o, lifecycler)

ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize distributor ring")
}
Expand All @@ -178,7 +182,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
ring_client.NewRingServiceDiscovery(ingestersRing),
factory,
metricIngesterClients,
log.Logger)
logger)

subservices = append(subservices, pool)

Expand All @@ -202,6 +206,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
globalTagsToDrop: tagsToDrop,
overrides: o,
traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
logger: logger,
}

if metricsGeneratorEnabled {
Expand All @@ -213,7 +218,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
return generator_client.New(addr, generatorClientCfg)
},
metricGeneratorClients,
log.Logger,
logger,
)

subservices = append(subservices, d.generatorsPool)
Expand Down Expand Up @@ -279,8 +284,12 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
return nil, err
}

if d.cfg.LogReceivedTraces {
logTraces(batches)
if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces {
if d.cfg.LogReceivedSpans.IncludeAllAttributes {
logSpansWithAllAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
} else {
logSpans(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
}
}

// metric size
Expand Down Expand Up @@ -531,16 +540,62 @@ func recordDiscaredSpans(err error, userID string, spanCount int) {
}
}

func logTraces(batches []*v1.ResourceSpans) {
func logSpans(batches []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, b := range batches {
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
level.Info(log.Logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
}
level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}
}
}
}

func logSpansWithAllAttributes(batch []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, b := range batch {
logSpansInResourceWithAllAttributes(b, filterByStatusError, logger)
}
}

func logSpansInResourceWithAllAttributes(rs *v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, a := range rs.Resource.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}

for _, ils := range rs.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
}

logSpanWithAllAttributes(s, logger)
}
}
}

func logSpanWithAllAttributes(s *v1.Span, logger log.Logger) {
for _, a := range s.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}

latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds())
logger = log.With(
logger,
"span_duration_seconds", latencySeconds,
"span_kind", s.GetKind().String(),
"span_status", s.GetStatus().GetCode().String())

level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}

// startEndFromSpan returns a unix epoch timestamp in seconds for the start and end of a span
func startEndFromSpan(span *v1.Span) (uint32, uint32) {
return uint32(span.StartTimeUnixNano / uint64(time.Second)), uint32(span.EndTimeUnixNano / uint64(time.Second))
Expand Down
Loading

0 comments on commit da138b1

Please sign in to comment.