Skip to content

Commit

Permalink
feat(metering): add meter for traces (#496)
Browse files Browse the repository at this point in the history
#### Features

- The metering for traces is dependent on SigNoz's custom schema and not
just on `ptrace.Traces`. The schema specific stuff has been moved to a
separate package. It is not recommended to import the
clickhousetracesexporter package in the metering package as it will
bring a lot of unwanted dependencies into the codebase of whoever is
importing it.
- Revert the modules created for metering and pdatagen. We thought we
would import unwanted dependencies in the gateway which is not the case.
We are only importing what we need there.


#### Fixes

- Remove redundant loggers in the LogsSize function.
  • Loading branch information
grandwizard28 authored Jan 2, 2025
1 parent 65aeb4a commit a9b7c29
Show file tree
Hide file tree
Showing 16 changed files with 671 additions and 210 deletions.
2 changes: 2 additions & 0 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func newExporter(cfg component.Config, logger *zap.Logger, settings exporter.Set
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: configClickHouse.UseNewSchema,
logger: logger,
}

return &storage, nil
Expand All @@ -101,6 +102,7 @@ type storage struct {
wg *sync.WaitGroup
closeChan chan struct{}
useNewSchema bool
logger *zap.Logger
}

type storageConfig struct {
Expand Down
7 changes: 6 additions & 1 deletion exporter/clickhousetracesexporter/schema-signoz.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap/zapcore"
)

// TODO: Read from github.com/SigNoz/signoz-otel-collector/pkg/schema/traces
type Event struct {
Name string `json:"name,omitempty"`
TimeUnixNano uint64 `json:"timeUnixNano,omitempty"`
Expand Down Expand Up @@ -131,6 +132,7 @@ type Span struct {
SpanAttributes []SpanAttribute `json:"spanAttributes,omitempty"`
}

// TODO: Read from github.com/SigNoz/signoz-otel-collector/pkg/schema/traces
type ErrorEvent struct {
Event Event `json:"errorEvent,omitempty"`
ErrorID string `json:"errorID,omitempty"`
Expand Down Expand Up @@ -167,7 +169,9 @@ type SpanV3 struct {
ResourcesString map[string]string `json:"resources_string,omitempty"`

// for events
Events []string `json:"event,omitempty"`
// TODO: Read from github.com/SigNoz/signoz-otel-collector/pkg/schema/traces
Events []string `json:"event,omitempty"`
// TODO: Read from github.com/SigNoz/signoz-otel-collector/pkg/schema/traces
ErrorEvents []ErrorEvent `json:"-"`

ServiceName string `json:"serviceName,omitempty"` // for error table
Expand Down Expand Up @@ -239,6 +243,7 @@ func (s *Span) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return nil
}

// TODO: Read from github.com/SigNoz/signoz-otel-collector/pkg/schema/traces
type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"`
Expand Down
29 changes: 0 additions & 29 deletions pkg/metering/go.mod

This file was deleted.

83 changes: 0 additions & 83 deletions pkg/metering/go.sum

This file was deleted.

158 changes: 156 additions & 2 deletions pkg/metering/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package metering

import (
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/SigNoz/signoz-otel-collector/pkg/schema/traces"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/SigNoz/signoz-otel-collector/utils/flatten"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
)

type jsonSizer struct {
Logger *zap.Logger
}

func NewJSONSizer(logger *zap.Logger) *jsonSizer {
func NewJSONSizer(logger *zap.Logger) Sizer {
return &jsonSizer{
Logger: logger,
}
Expand All @@ -19,9 +26,156 @@ func NewJSONSizer(logger *zap.Logger) *jsonSizer {
func (sizer *jsonSizer) SizeOfMapStringAny(input map[string]any) int {
bytes, err := json.Marshal(input)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Any("obj", input))
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
return 0
}

return len(bytes)
}

func (sizer *jsonSizer) SizeOfFlatPcommonMapInMapStringString(input pcommon.Map) int {
output := map[string]string{}

input.Range(func(k string, v pcommon.Value) bool {
switch v.Type() {
case pcommon.ValueTypeMap:
flattened := flatten.FlattenJSON(v.Map().AsRaw(), k)
for kf, vf := range flattened {
output[kf] = fmt.Sprintf("%v", vf)
}
default:
output[k] = v.AsString()
}
return true
})

bytes, err := json.Marshal(output)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
return 0
}

return len(bytes)
}

func (sizer *jsonSizer) SizeOfFlatPcommonMapInNumberStringBool(input pcommon.Map) (int, int, int) {
n := map[string]float64{}
s := map[string]string{}
b := map[string]bool{}

input.Range(func(k string, v pcommon.Value) bool {
switch v.Type() {
case pcommon.ValueTypeDouble:
if utils.IsValidFloat(v.Double()) {
n[k] = v.Double()
}
case pcommon.ValueTypeInt:
n[k] = float64(v.Int())
case pcommon.ValueTypeBool:
b[k] = v.Bool()
case pcommon.ValueTypeMap:
flattened := flatten.FlattenJSON(v.Map().AsRaw(), k)
for kf, vf := range flattened {
switch tvf := vf.(type) {
case string:
s[kf] = tvf
case float64:
n[kf] = tvf
case bool:
b[kf] = tvf
}
}
default:
s[k] = v.AsString()
}

return true
})

nbytes, err := json.Marshal(n)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
nbytes = []byte(nil)
}

sbytes, err := json.Marshal(s)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
sbytes = []byte(nil)
}

bbytes, err := json.Marshal(b)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
bbytes = []byte(nil)
}

return len(nbytes), len(sbytes), len(bbytes)
}

func (sizer *jsonSizer) SizeOfInt(input int) int {
return len(strconv.Itoa(input))
}

func (sizer *jsonSizer) SizeOfFloat64(input float64) int {
return len(strconv.FormatFloat(input, 'f', -1, 64))
}

func (sizer *jsonSizer) SizeOfTraceID(input pcommon.TraceID) int {
if input.IsEmpty() {
return 0
}

// Since we encode to hex, the original 16 bytes are stored in 32 bytes
return 32
}

func (sizer *jsonSizer) SizeOfSpanID(input pcommon.SpanID) int {
if input.IsEmpty() {
return 0
}

// Since we encode to hex, the original 8 bytes are stored in 16 bytes
return 16
}

func (sizer *jsonSizer) SizeOfEvents(input []string) int {
bytes, err := json.Marshal(input)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
return 0
}

return len(bytes)
}

func (sizer *jsonSizer) SizeOfOtelSpanRefs(input []traces.OtelSpanRef) int {
if input == nil {
return 0
}

bytes, err := json.Marshal(input)
if err != nil {
sizer.Logger.Error("cannot marshal object, setting size to 0", zap.Error(err), zap.Any("obj", input))
return 0
}

escapeCharacters := strings.Count(string(bytes), "\"")
return len(bytes) + escapeCharacters
}

func (size *jsonSizer) TotalSizeIfKeyExists(key int, value int, extra int) int {
if value == 0 {
return 0
}

return key + value + extra
}

func (size *jsonSizer) TotalSizeIfKeyExistsAndValueIsMapOrSlice(key int, value int, extra int) int {
if value <= 2 {
return 0
}

return key + value + extra
}
12 changes: 12 additions & 0 deletions pkg/metering/meter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metering

import (
"github.com/SigNoz/signoz-otel-collector/pkg/schema/traces"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -18,6 +20,16 @@ type Meter[T ptrace.Traces | pmetric.Metrics | plog.Logs] interface {
// Sizer is an interface that calculates the size of different of map[string]any
type Sizer interface {
SizeOfMapStringAny(map[string]any) int
SizeOfFlatPcommonMapInMapStringString(pcommon.Map) int
SizeOfInt(int) int
SizeOfFloat64(float64) int
SizeOfTraceID(pcommon.TraceID) int
SizeOfSpanID(pcommon.SpanID) int
SizeOfFlatPcommonMapInNumberStringBool(pcommon.Map) (int, int, int)
SizeOfEvents([]string) int
SizeOfOtelSpanRefs([]traces.OtelSpanRef) int
TotalSizeIfKeyExists(int, int, int) int
TotalSizeIfKeyExistsAndValueIsMapOrSlice(int, int, int) int
}

// Logs calculates billable metrics for logs.
Expand Down
4 changes: 0 additions & 4 deletions pkg/metering/v1/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ func NewLogs(logger *zap.Logger) metering.Logs {
}

func (meter *logs) Size(ld plog.Logs) int {

meter.Logger.Debug("Calculating logs size")

total := 0
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
Expand All @@ -40,7 +37,6 @@ func (meter *logs) Size(ld plog.Logs) int {

}
}
meter.Logger.Debug("Logs size", zap.Int("size", total))

return total
}
Expand Down
Loading

0 comments on commit a9b7c29

Please sign in to comment.