diff --git a/.travis.yml b/.travis.yml index 904805d..041b6a8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,8 @@ script: - export GO111MODULE=on - go mod download - golangci-lint run - - go test -v -covermode atomic -race -timeout 60s -cpu 4 -parallel 8 -coverprofile ./coverage.out ./... + - go test -v -covermode atomic -race -timeout 60s -cpu 4 -parallel 8 -coverprofile ./coverage.out $(go list ./... | grep -v translator) + - go test -v -covermode atomic -race -timeout 60s -cpu 4 -parallel 8 $(go list ./... | grep -i translator) - "! go tool cover -func coverage.out | grep -v 100.0%" after_script: diff --git a/go.mod b/go.mod index a0bd63f..23c32b8 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/signalfx/golib/v3 go 1.13 +replace git.apache.org/thrift.git => github.com/signalfx/thrift v0.0.0-20181211001559-3838fa316492 + require ( github.com/boltdb/bolt v1.3.1 github.com/dropbox/godropbox v0.0.0-20180512210157-31879d3884b9 @@ -9,7 +11,9 @@ require ( github.com/go-kit/kit v0.9.0 github.com/go-logfmt/logfmt v0.4.0 github.com/go-stack/stack v1.8.0 + github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 + github.com/jaegertracing/jaeger v1.15.1 github.com/juju/errors v0.0.0-20181012004132-a4583d0a56ea github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e github.com/opentracing/opentracing-go v1.1.0 diff --git a/go.sum b/go.sum index 084fa52..4e19cb9 100644 --- a/go.sum +++ b/go.sum @@ -91,11 +91,10 @@ github.com/signalfx/com_signalfx_metrics_protobuf v0.0.0-20190222193949-1fb69526 github.com/signalfx/com_signalfx_metrics_protobuf v0.0.0-20190222193949-1fb69526e884/go.mod h1:muYA2clvwCdj7nzAJ5vJIXYpJsUumhAl4Uu1wUNpWzA= github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083 h1:WsShHmu12ZztYPfh9b+I+VjYD1o8iOHhB67WZCMEEE8= github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083/go.mod h1:adPDS6s7WaajdFBV9mQ7i0dKfQ8xiDnF9ZNETVPpp7c= +github.com/signalfx/golib v2.5.1+incompatible h1:rw16Flfx5Z29DahDSNGAA3ch8dOeNc3oOMUJm493yao= github.com/signalfx/golib/v3 v3.0.0/go.mod h1:p+krygP/cDlWvCBEgdkQp3H16rbP4NW7YQa81TDMRe8= github.com/signalfx/gomemcache v0.0.0-20180823214636-4f7ef64c72a9 h1:M3hb9iDDlB3LrR1xI6wiWIA75Ol9eenD5fbV5d3bxjc= github.com/signalfx/gomemcache v0.0.0-20180823214636-4f7ef64c72a9/go.mod h1:Ytb8KfCSyuwy/VILnROdgCvbQLA5ch0nkbG7lKT0BXw= -github.com/signalfx/sapm-proto v0.1.1-0.20191217203625-02349a3badae h1:plYuQx78OfsLSQxWxq6NroClbuQhZ4BAF9no5zPhWP4= -github.com/signalfx/sapm-proto v0.1.1-0.20191217203625-02349a3badae/go.mod h1:X/wS1ofuOAW+OTFhCALiHVZvihqMDiNPhzmbusWCQi8= github.com/signalfx/sapm-proto v0.2.0 h1:uaNhSN9qE8g16YtXSbryPmbkRiyCt8Bw4XTjV/MC5Xg= github.com/signalfx/sapm-proto v0.2.0/go.mod h1:X/wS1ofuOAW+OTFhCALiHVZvihqMDiNPhzmbusWCQi8= github.com/signalfx/thrift v0.0.0-20181211001559-3838fa316492 h1:xWH5x1J8QgMdxs1q+jcuVyBKmazAee82UWeS9BPMxzY= diff --git a/sfxclient/README.md b/sfxclient/README.md index 22ca8b3..c5da48b 100644 --- a/sfxclient/README.md +++ b/sfxclient/README.md @@ -1,6 +1,6 @@ # sfxclient - import "github.com/signalfx/golib/sfxclient" + import "github.com/signalfx/golib/v3/sfxclient" Package signalfx creates convenient go functions and wrappers to send metrics to SignalFx. diff --git a/translator/batcher.go b/translator/batcher.go new file mode 100644 index 0000000..e1ba6ff --- /dev/null +++ b/translator/batcher.go @@ -0,0 +1,81 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "crypto/sha256" + "fmt" + + "github.com/gogo/protobuf/proto" + jaegerpb "github.com/jaegertracing/jaeger/model" +) + +type bucketID [32]byte + +// spanBatcher is simpler version of OpenTelemetry's Node Batcher. +// spanBatcher takes spans and groups them into Jaeger Batches using +// the Span Process objects. +type spanBatcher struct { + buckets map[bucketID]*jaegerpb.Batch +} + +func (b *spanBatcher) add(span *jaegerpb.Span) { + if b.buckets == nil { + b.buckets = make(map[bucketID]*jaegerpb.Batch) + } + + batchByProcess := span.Process + id, err := b.genBucketID(span.Process) + if err != nil { + batchByProcess = nil + } + + batch := b.getOrAddBatch(id, batchByProcess) + if batch.Process != nil { + span.Process = nil + } + batch.Spans = append(batch.Spans, span) +} + +func (b *spanBatcher) batches() []*jaegerpb.Batch { + batches := make([]*jaegerpb.Batch, 0, len(b.buckets)) + for _, b := range b.buckets { + batches = append(batches, b) + } + return batches +} + +func (b *spanBatcher) genBucketID(process *jaegerpb.Process) (bucketID, error) { + if process != nil { + sortTags(process.Tags) + key, err := proto.Marshal(process) + if err != nil { + return bucketID{}, fmt.Errorf("error generating bucket ID: %s", err.Error()) + } + return sha256.Sum256(key), nil + } + return bucketID{}, nil +} + +func (b *spanBatcher) getOrAddBatch(id bucketID, p *jaegerpb.Process) *jaegerpb.Batch { + batch, ok := b.buckets[id] + if !ok { + batch = &jaegerpb.Batch{ + Process: p, + } + b.buckets[id] = batch + } + return batch +} diff --git a/translator/batcher_test.go b/translator/batcher_test.go new file mode 100644 index 0000000..eceeaa2 --- /dev/null +++ b/translator/batcher_test.go @@ -0,0 +1,120 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "testing" + + jaegerpb "github.com/jaegertracing/jaeger/model" + "github.com/stretchr/testify/assert" +) + +func TestBatcher(t *testing.T) { + p1 := &jaegerpb.Process{ + ServiceName: "s1", + Tags: []jaegerpb.KeyValue{ + { + Key: "k1", + VType: jaegerpb.ValueType_STRING, + VStr: "v1", + }, + { + Key: "k2", + VType: jaegerpb.ValueType_STRING, + VStr: "v2", + }, + }, + } + p2 := &jaegerpb.Process{ + ServiceName: "s1", + Tags: []jaegerpb.KeyValue{ + { + Key: "k2", + VType: jaegerpb.ValueType_STRING, + VStr: "v2", + }, + { + Key: "k1", + VType: jaegerpb.ValueType_STRING, + VStr: "v1", + }, + }, + } + p3 := &jaegerpb.Process{ServiceName: "s2"} + p4 := &jaegerpb.Process{ServiceName: "s3"} + + spans := []*jaegerpb.Span{ + {Process: p1, SpanID: jaegerpb.SpanID(1)}, + {Process: p1, SpanID: jaegerpb.SpanID(2)}, + {Process: p2, SpanID: jaegerpb.SpanID(3)}, + {Process: p3, SpanID: jaegerpb.SpanID(4)}, + {Process: p3, SpanID: jaegerpb.SpanID(5)}, + {Process: p4, SpanID: jaegerpb.SpanID(6)}, + {Process: p4, SpanID: jaegerpb.SpanID(7)}, + {SpanID: jaegerpb.SpanID(8)}, + {SpanID: jaegerpb.SpanID(9)}, + } + + b := &spanBatcher{} + for _, s := range spans { + b.add(s) + } + + batches := b.batches() + assert.Len(t, batches, 4) + + b1 := findBatchWithProcessServiceName(batches, p1) + assert.NotNil(t, b1) + assert.Equal(t, b1.Process, p1) + assertSpansAreEqual(t, b1.Spans, []*jaegerpb.Span{spans[0], spans[1], spans[2]}) + + b2 := findBatchWithProcessServiceName(batches, p2) + assert.Equal(t, b1, b2) + + b3 := findBatchWithProcessServiceName(batches, p3) + assert.NotNil(t, b3) + assert.Equal(t, b3.Process, p3) + assertSpansAreEqual(t, b3.Spans, []*jaegerpb.Span{spans[3], spans[4]}) + + b4 := findBatchWithProcessServiceName(batches, p4) + assert.NotNil(t, b4) + assert.Equal(t, b4.Process, p4) + assertSpansAreEqual(t, b4.Spans, []*jaegerpb.Span{spans[5], spans[6]}) + + var nilProcess *jaegerpb.Process + b5 := findBatchWithProcessServiceName(batches, nil) + assert.NotNil(t, b5) + assert.Equal(t, b5.Process, nilProcess) + assertSpansAreEqual(t, b5.Spans, []*jaegerpb.Span{spans[7], spans[8]}) + + for _, s := range spans { + assert.Nil(t, s.Process) + } +} + +func findBatchWithProcessServiceName(batches []*jaegerpb.Batch, p *jaegerpb.Process) *jaegerpb.Batch { + for _, b := range batches { + if p == nil { + if b.Process == nil { + return b + } + } else { + if b.Process != nil && b.Process.ServiceName == p.ServiceName { + return b + } + } + } + return nil +} diff --git a/translator/constants.go b/translator/constants.go new file mode 100644 index 0000000..bfe0e5d --- /dev/null +++ b/translator/constants.go @@ -0,0 +1,26 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +const ( + peerHostIPv4 = "peer.ipv4" + peerHostIPv6 = "peer.ipv6" + peerPort = "peer.port" + spanKind = "span.kind" + spanKindRPCClient = "client" + spanKindRPCServer = "server" + spanKindProducer = "producer" + spanKindConsumer = "consumer" +) diff --git a/translator/grpc_http_mapper.go b/translator/grpc_http_mapper.go new file mode 100644 index 0000000..012fc93 --- /dev/null +++ b/translator/grpc_http_mapper.go @@ -0,0 +1,100 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "net/http" +) + +// This code is taken from OpenTelemetry project in order to avoid adding the whole project as a dependency. + +// https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186 +const ( + OCOK = 0 + OCCancelled = 1 + OCUnknown = 2 + OCInvalidArgument = 3 + OCDeadlineExceeded = 4 + OCNotFound = 5 + OCAlreadyExists = 6 + OCPermissionDenied = 7 + OCResourceExhausted = 8 + OCFailedPrecondition = 9 + OCAborted = 10 + OCOutOfRange = 11 + OCUnimplemented = 12 + OCInternal = 13 + OCUnavailable = 14 + OCDataLoss = 15 + OCUnauthenticated = 16 +) + +var httpToOCCodeMap = map[int32]int32{ + 401: OCUnauthenticated, + 403: OCPermissionDenied, + 404: OCNotFound, + 429: OCResourceExhausted, + 499: OCCancelled, + 501: OCUnimplemented, + 503: OCUnavailable, + 504: OCDeadlineExceeded, +} + +// OCStatusCodeFromHTTP takes an HTTP status code and return the appropriate OpenTelemetry status code +// See: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-http.md +func OCStatusCodeFromHTTP(code int32) int32 { + if code >= 100 && code < 400 { + return OCOK + } + if rvCode, ok := httpToOCCodeMap[code]; ok { + return rvCode + } + if code >= 400 && code < 500 { + return OCInvalidArgument + } + if code >= 500 && code < 600 { + return OCInternal + } + return OCUnknown +} + +var ocToHTTPCodeMap = map[int32]int32{ + OCOK: http.StatusOK, + OCCancelled: 499, + OCUnknown: http.StatusInternalServerError, + OCInvalidArgument: http.StatusBadRequest, + OCDeadlineExceeded: http.StatusGatewayTimeout, + OCNotFound: http.StatusNotFound, + OCAlreadyExists: http.StatusConflict, + OCPermissionDenied: http.StatusForbidden, + OCResourceExhausted: http.StatusTooManyRequests, + OCFailedPrecondition: http.StatusPreconditionFailed, + OCAborted: http.StatusConflict, + OCOutOfRange: http.StatusRequestedRangeNotSatisfiable, + OCUnimplemented: http.StatusNotImplemented, + OCInternal: http.StatusInternalServerError, + OCUnavailable: http.StatusServiceUnavailable, + OCDataLoss: http.StatusUnprocessableEntity, + OCUnauthenticated: http.StatusUnauthorized, +} + +// HTTPStatusCodeFromOCStatus takes an OpenTelemetry status code and return the appropriate HTTP status code +// See: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-http.md +func HTTPStatusCodeFromOCStatus(code int32) int32 { + if rvCode, ok := ocToHTTPCodeMap[code]; ok { + return rvCode + } + return http.StatusInternalServerError +} diff --git a/translator/grpc_http_mapper_test.go b/translator/grpc_http_mapper_test.go new file mode 100644 index 0000000..d2a5568 --- /dev/null +++ b/translator/grpc_http_mapper_test.go @@ -0,0 +1,35 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHTTPStatusFromOTStatus(t *testing.T) { + for otelStatus := int32(OCOK); otelStatus <= OCUnauthenticated; otelStatus++ { + httpStatus := HTTPStatusCodeFromOCStatus(otelStatus) + assert.True(t, httpStatus != 0) + } +} + +func TestOTStatusFromHTTPStatus(t *testing.T) { + for httpStatus := int32(100); httpStatus <= 604; httpStatus++ { + otelStatus := OCStatusCodeFromHTTP(httpStatus) + assert.True(t, otelStatus >= OCOK && otelStatus <= OCUnauthenticated) + } +} diff --git a/translator/sfx.go b/translator/sfx.go new file mode 100644 index 0000000..ef96156 --- /dev/null +++ b/translator/sfx.go @@ -0,0 +1,251 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "encoding/json" + "sort" + "time" + + jaegerpb "github.com/jaegertracing/jaeger/model" + "github.com/signalfx/golib/v3/trace" + + gen "github.com/signalfx/sapm-proto/gen" +) + +const ( + clientKind = "CLIENT" + serverKind = "SERVER" + producerKind = "PRODUCER" + consumerKind = "CONSUMER" + + tagJaegerVersion = "jaeger.version" + tagIP = "ip" + tagHostname = "hostname" + + nanosInOneMicro = time.Microsecond +) + +// SFXToSAPMPostRequest takes a slice spans in the SignalFx format and converts it to a SAPM PostSpansRequest +func SFXToSAPMPostRequest(spans []*trace.Span) *gen.PostSpansRequest { + sr := &gen.PostSpansRequest{} + + batcher := spanBatcher{} + + for _, sfxSpan := range spans { + span := sapmSpanFromSFXSpan(sfxSpan) + batcher.add(span) + } + + sr.Batches = batcher.batches() + return sr +} + +func getLocalEndpointInfo(sfxSpan *trace.Span, span *jaegerpb.Span) { + if sfxSpan.LocalEndpoint != nil { + if sfxSpan.LocalEndpoint.ServiceName != nil { + span.Process.ServiceName = *sfxSpan.LocalEndpoint.ServiceName + } + if sfxSpan.LocalEndpoint.Ipv4 != nil { + span.Process.Tags = append(span.Process.Tags, jaegerpb.KeyValue{ + Key: "ip", + VType: jaegerpb.ValueType_STRING, + VStr: *sfxSpan.LocalEndpoint.Ipv4, + }) + } + } +} + +func sapmSpanFromSFXSpan(sfxSpan *trace.Span) *jaegerpb.Span { + spanID, err := jaegerpb.SpanIDFromString(sfxSpan.ID) + if err != nil { + return nil + } + + traceID, err := jaegerpb.TraceIDFromString(sfxSpan.TraceID) + if err != nil { + return nil + } + + span := &jaegerpb.Span{ + SpanID: spanID, + TraceID: traceID, + Process: &jaegerpb.Process{}, + } + + if sfxSpan.Name != nil { + span.OperationName = *sfxSpan.Name + } + + if sfxSpan.Duration != nil { + span.Duration = durationFromMicroseconds(*sfxSpan.Duration) + } + + if sfxSpan.Timestamp != nil { + span.StartTime = timeFromMicrosecondsSinceEpoch(*sfxSpan.Timestamp) + } + + if sfxSpan.Debug != nil && *sfxSpan.Debug { + span.Flags.SetDebug() + } + + span.Tags, span.Process.Tags = sfxTagsToJaegerTags(sfxSpan.Tags, sfxSpan.RemoteEndpoint, sfxSpan.Kind) + + getLocalEndpointInfo(sfxSpan, span) + + if sfxSpan.ParentID != nil { + parentID, err := jaegerpb.SpanIDFromString(*sfxSpan.ParentID) + if err == nil { + span.References = append(span.References, jaegerpb.SpanRef{ + TraceID: traceID, + SpanID: parentID, + RefType: jaegerpb.SpanRefType_CHILD_OF, + }) + } + } + + span.Logs = sfxAnnotationsToJaegerLogs(sfxSpan.Annotations) + return span +} + +func sfxTagsToJaegerTags(tags map[string]string, remoteEndpoint *trace.Endpoint, kind *string) ([]jaegerpb.KeyValue, []jaegerpb.KeyValue) { + processTags := make([]jaegerpb.KeyValue, 0, len(tags)) + spanTags := make([]jaegerpb.KeyValue, 0, len(tags)+3) + + if remoteEndpoint != nil { + if remoteEndpoint.Ipv4 != nil { + spanTags = append(spanTags, jaegerpb.KeyValue{ + Key: peerHostIPv4, + VType: jaegerpb.ValueType_STRING, + VStr: *remoteEndpoint.Ipv4, + }) + } + + if remoteEndpoint.Ipv6 != nil { + spanTags = append(spanTags, jaegerpb.KeyValue{ + Key: peerHostIPv6, + VType: jaegerpb.ValueType_STRING, + VStr: *remoteEndpoint.Ipv6, + }) + } + + if remoteEndpoint.Port != nil { + spanTags = append(spanTags, jaegerpb.KeyValue{ + Key: peerPort, + VType: jaegerpb.ValueType_INT64, + VInt64: int64(*remoteEndpoint.Port), + }) + } + } + + if kind != nil { + spanTags = append(spanTags, sfxKindToJaeger(*kind)) + } + + for k, v := range tags { + kv := jaegerpb.KeyValue{ + Key: k, + VType: jaegerpb.ValueType_STRING, + VStr: v, + } + switch k { + case tagJaegerVersion, tagHostname, tagIP: + processTags = append(processTags, kv) + default: + spanTags = append(spanTags, kv) + } + } + + return spanTags, processTags +} + +func sfxAnnotationsToJaegerLogs(annotations []*trace.Annotation) []jaegerpb.Log { + logs := make([]jaegerpb.Log, 0, len(annotations)) + for _, ann := range annotations { + if ann.Value == nil { + continue + } + log := jaegerpb.Log{} + if ann.Timestamp != nil { + log.Timestamp = timeFromMicrosecondsSinceEpoch(*ann.Timestamp) + } + var err error + log.Fields, err = fieldsFromJSONString(*ann.Value) + if err != nil { + continue + } + logs = append(logs, log) + } + return logs +} + +func fieldsFromJSONString(jStr string) ([]jaegerpb.KeyValue, error) { + fields := make(map[string]string) + kv := make([]jaegerpb.KeyValue, 0, len(fields)) + err := json.Unmarshal([]byte(jStr), &fields) + if err != nil { + kv = append(kv, jaegerpb.KeyValue{ + Key: "event", + VType: jaegerpb.ValueType_STRING, + VStr: jStr, + }) + return kv, err + } + + for k, v := range fields { + kv = append(kv, jaegerpb.KeyValue{ + Key: k, + VType: jaegerpb.ValueType_STRING, + VStr: v, + }) + } + return kv, nil +} + +func sfxKindToJaeger(kind string) jaegerpb.KeyValue { + kv := jaegerpb.KeyValue{ + Key: spanKind, + } + + switch kind { + case clientKind: + kv.VStr = spanKindRPCClient + case serverKind: + kv.VStr = spanKindRPCServer + case producerKind: + kv.VStr = spanKindProducer + case consumerKind: + kv.VStr = spanKindConsumer + } + return kv +} + +func durationFromMicroseconds(micros int64) time.Duration { + return time.Duration(micros) * nanosInOneMicro +} + +func timeFromMicrosecondsSinceEpoch(micros int64) time.Time { + nanos := micros * int64(nanosInOneMicro) + return time.Unix(0, nanos).UTC() +} + +func sortTags(t []jaegerpb.KeyValue) { + if t == nil { + return + } + sort.Slice(t, func(i, j int) bool { + return t[i].Key <= t[j].Key + }) +} diff --git a/translator/sfx_test.go b/translator/sfx_test.go new file mode 100644 index 0000000..55a8e70 --- /dev/null +++ b/translator/sfx_test.go @@ -0,0 +1,606 @@ +// Copyright 2019 Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "sort" + "testing" + "time" + + jaegerpb "github.com/jaegertracing/jaeger/model" + "github.com/signalfx/golib/v3/pointer" + "github.com/signalfx/golib/v3/trace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + sapmpb "github.com/signalfx/sapm-proto/gen" +) + +var ( + ClientKind = "CLIENT" + ServerKind = "SERVER" + ProducerKind = "PRODUCER" + ConsumerKind = "CONSUMER" +) + +func TestTimeTranslator(t *testing.T) { + timeInMicros := int64(1575986204988181) + + want, err := time.Parse(time.RFC3339Nano, "2019-12-10T13:56:44.988181Z") + require.NoError(t, err) + + got := timeFromMicrosecondsSinceEpoch(timeInMicros).UTC() + assert.Equal(t, want, got) +} + +func TestDurationTranslator(t *testing.T) { + cases := map[int64]time.Duration{ + 1: time.Microsecond, + 6e+7: time.Minute, + 3.6e+9: time.Hour, + } + + for ms, want := range cases { + got := durationFromMicroseconds(ms) + assert.Equal(t, want, got) + } +} + +func TestTranslator(t *testing.T) { + got := SFXToSAPMPostRequest(sourceSpans) + require.Equal(t, len(wantPostRequest.Batches), len(got.Batches)) + sortBatches(wantPostRequest.Batches) + sortBatches(got.Batches) + + for i := 0; i < len(got.Batches); i++ { + assertBatchesAreEqual(t, got.Batches[i], wantPostRequest.Batches[i]) + } + +} + +func assertBatchesAreEqual(t *testing.T, got, want *jaegerpb.Batch) { + require.Equal(t, len(got.Spans), len(want.Spans)) + assertProcessesAreEqual(t, got.Process, want.Process) + assertSpansAreEqual(t, got.Spans, want.Spans) + +} + +func assertProcessesAreEqual(t *testing.T, got, want *jaegerpb.Process) { + sortTags(want.Tags) + sortTags(got.Tags) + assert.Equal(t, got, want) +} + +func assertSpansAreEqual(t *testing.T, got, want []*jaegerpb.Span) { + sortSpans(got) + sortSpans(want) + for i := 0; i < len(got); i++ { + require.Equal(t, sortedSpan(got[i]), sortedSpan(want[i])) + } +} + +func sortBatches(batches []*jaegerpb.Batch) { + sort.Slice(batches, func(i, j int) bool { + s1, s2 := "", "" + if batches[i].Process != nil { + s1 = batches[i].Process.ServiceName + } + if batches[j].Process != nil { + s2 = batches[j].Process.ServiceName + } + return s1 <= s2 + }) +} + +func sortSpans(spans []*jaegerpb.Span) { + sort.Slice(spans, func(i, j int) bool { + return spans[i].SpanID < spans[j].SpanID + }) +} + +func sortRefs(t []jaegerpb.SpanRef) { + sort.Slice(t, func(i, j int) bool { + return t[i].String() <= t[j].String() + }) +} + +func sortedSpan(s *jaegerpb.Span) *jaegerpb.Span { + sortLogs(s.Logs) + sortTags(s.Tags) + sortRefs(s.References) + sort.Strings(s.Warnings) + return s +} + +func sortLogs(t []jaegerpb.Log) { + sort.Slice(t, func(i, j int) bool { + return t[i].String() <= t[j].String() + }) + for _, l := range t { + sortTags(l.Fields) + } +} + +var wantPostRequest = sapmpb.PostSpansRequest{ + Batches: []*jaegerpb.Batch{ + { + Process: &jaegerpb.Process{ + ServiceName: "api1", + Tags: []jaegerpb.KeyValue{ + { + Key: "ip", + VType: jaegerpb.ValueType_STRING, + VStr: "10.53.69.61", + }, + { + Key: "hostname", + VType: jaegerpb.ValueType_STRING, + VStr: "api246-sjc1", + }, + { + Key: "jaeger.version", + VType: jaegerpb.ValueType_STRING, + VStr: "Python-3.1.0", + }, + }, + }, + Spans: []*jaegerpb.Span{ + { + SpanID: jaegerpb.SpanID(0x147d98), + TraceID: jaegerpb.TraceID{Low: 11715721395283892799}, + OperationName: "get", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Duration(22938000), + Flags: 0, + Process: nil, + ProcessID: "", + References: []jaegerpb.SpanRef{ + { + TraceID: jaegerpb.TraceID{Low: 11715721395283892799}, + SpanID: jaegerpb.SpanID(0x68c4e3), + RefType: jaegerpb.SpanRefType_CHILD_OF, + }, + }, + Tags: []jaegerpb.KeyValue{ + { + Key: "peer.ipv4", + VType: jaegerpb.ValueType_STRING, + VStr: "192.53.69.61", + }, + { + Key: "peer.port", + VType: jaegerpb.ValueType_INT64, + VInt64: 53931, + }, + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "server", + }, + { + Key: "someFalseBool", + VType: jaegerpb.ValueType_STRING, + VStr: "false", + }, + { + Key: "someDouble", + VType: jaegerpb.ValueType_STRING, + VStr: "129.8", + }, + { + Key: "http.url", + VType: jaegerpb.ValueType_STRING, + VStr: "http://127.0.0.1:15598/client_transactions", + }, + { + Key: "someBool", + VType: jaegerpb.ValueType_STRING, + VStr: "true", + }, + }, + Logs: []jaegerpb.Log{ + { + Timestamp: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Fields: []jaegerpb.KeyValue{ + { + Key: "key1", + VType: jaegerpb.ValueType_STRING, + VStr: "value1", + }, + { + Key: "key2", + VType: jaegerpb.ValueType_STRING, + VStr: "value2", + }, + }, + }, + }, + Warnings: nil, + }, + { + TraceID: jaegerpb.TraceID{Low: 12868642899890739775, High: 1}, + SpanID: jaegerpb.SpanID(0x21d092272e), + OperationName: "post", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + References: []jaegerpb.SpanRef{{ + TraceID: jaegerpb.TraceID{Low: 12868642899890739775, High: 1}, + SpanID: jaegerpb.SpanID(6866147), + RefType: jaegerpb.SpanRefType_CHILD_OF, + }}, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "client", + }, + { + Key: "peer.port", + VType: jaegerpb.ValueType_INT64, + VInt64: 53931, + }, + { + Key: "peer.ipv4", + VType: jaegerpb.ValueType_STRING, + VStr: "10.0.0.1", + }, + }, + Logs: []jaegerpb.Log{}, + }, { + TraceID: jaegerpb.TraceID{Low: 14021564404497586751}, + SpanID: jaegerpb.SpanID(213952636718), + OperationName: "post", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + References: []jaegerpb.SpanRef{{ + TraceID: jaegerpb.TraceID{Low: 14021564404497586751}, + SpanID: jaegerpb.SpanID(6866147), + RefType: jaegerpb.SpanRefType_CHILD_OF, + }}, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "consumer", + }, + }, + Logs: []jaegerpb.Log{}, + }, { + + TraceID: jaegerpb.TraceID{Low: 15174485909104433727}, + SpanID: jaegerpb.SpanID(47532398882098234), + OperationName: "post", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + Flags: 2, + References: []jaegerpb.SpanRef{ + { + RefType: jaegerpb.SpanRefType_CHILD_OF, + TraceID: jaegerpb.TraceID{Low: 15174485909104433727}, + SpanID: jaegerpb.SpanID(6866147), + }, + }, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "producer", + }, + { + Key: "peer.ipv6", + VType: jaegerpb.ValueType_STRING, + VStr: "::1", + }, + }, + Logs: []jaegerpb.Log{}, + }, { + + TraceID: jaegerpb.TraceID{Low: 16327407413711280703}, + SpanID: jaegerpb.SpanID(52035998509468730), + OperationName: "post", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + Flags: 2, + References: []jaegerpb.SpanRef{ + { + RefType: jaegerpb.SpanRefType_CHILD_OF, + TraceID: jaegerpb.TraceID{Low: 16327407413711280703}, + SpanID: jaegerpb.SpanID(7914723), + }, + }, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "producer", + }, + { + Key: "peer.ipv6", + VType: jaegerpb.ValueType_STRING, + VStr: "::1", + }, + { + Key: "elements", + VType: jaegerpb.ValueType_STRING, + VStr: "100", + }, + }, + Logs: []jaegerpb.Log{}, + }, + }, + }, { + Process: &jaegerpb.Process{ + ServiceName: "api2", + Tags: []jaegerpb.KeyValue{ + { + Key: "ip", + VType: jaegerpb.ValueType_STRING, + VStr: "10.53.69.70", + }, + { + Key: "jaeger.version", + VType: jaegerpb.ValueType_STRING, + VStr: "Python-3.6.0", + }, + { + Key: "hostname", + VType: jaegerpb.ValueType_STRING, + VStr: "api2-233", + }, + }, + }, + Spans: []*jaegerpb.Span{ + { + TraceID: jaegerpb.TraceID{Low: 17480328918319176255}, + SpanID: jaegerpb.SpanID(58525199627357242), + OperationName: "post", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + Flags: 2, + References: []jaegerpb.SpanRef{ + { + RefType: jaegerpb.SpanRefType_CHILD_OF, + TraceID: jaegerpb.TraceID{Low: 17480328918319176255}, + SpanID: jaegerpb.SpanID(0x35c4e2), + }, + }, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "producer", + }, + { + Key: "peer.ipv6", + VType: jaegerpb.ValueType_STRING, + VStr: "::1", + }, + }, + Logs: []jaegerpb.Log{}, + }, + }, + }, { + Process: &jaegerpb.Process{ + ServiceName: "api3", + Tags: []jaegerpb.KeyValue{ + { + Key: "ip", + VType: jaegerpb.ValueType_STRING, + VStr: "10.53.67.53", + }, + { + Key: "jaeger.version", + VType: jaegerpb.ValueType_STRING, + VStr: "Python-3.6.0", + }, + { + Key: "hostname", + VType: jaegerpb.ValueType_STRING, + VStr: "api3-sjc1", + }, + }, + }, + Spans: []*jaegerpb.Span{ + { + TraceID: jaegerpb.TraceID{Low: 18025686685695023674}, + SpanID: jaegerpb.SpanID(63028799254727738), + OperationName: "get", + StartTime: time.Date(2017, 01, 26, 21, 46, 31, 639875000, time.UTC), + Duration: time.Microsecond * 22938, + Tags: []jaegerpb.KeyValue{ + { + Key: "span.kind", + VType: jaegerpb.ValueType_STRING, + VStr: "client", + }, + { + Key: "peer.ipv6", + VType: jaegerpb.ValueType_STRING, + VStr: "::1", + }, + }, + Logs: []jaegerpb.Log{}, + }, + }, + }, + }, +} + +var sourceSpans = []*trace.Span{ + { + TraceID: "a2969a8955571a3f", + ParentID: pointer.String("000000000068c4e3"), + ID: "0000000000147d98", + Name: pointer.String("get"), + Kind: &ServerKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api1"), + Ipv4: pointer.String("10.53.69.61"), + }, + RemoteEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("rtapi"), + Ipv4: pointer.String("192.53.69.61"), + Port: pointer.Int32(53931), + }, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Debug: nil, + Shared: nil, + Annotations: []*trace.Annotation{ + {Timestamp: pointer.Int64(1485467191639875), Value: pointer.String("{\"key1\":\"value1\",\"key2\":\"value2\"}")}, + {Timestamp: pointer.Int64(1485467191639875), Value: pointer.String("nothing")}, + }, + Tags: map[string]string{ + "http.url": "http://127.0.0.1:15598/client_transactions", + "someBool": "true", + "someFalseBool": "false", + "someDouble": "129.8", + "hostname": "api246-sjc1", + "jaeger.version": "Python-3.1.0", + }, + }, + { + TraceID: "0000000000000001b2969a8955571a3f", + ParentID: pointer.String("000000000068c4e3"), + ID: "00000021d092272e", + Name: pointer.String("post"), + Kind: &ClientKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api1"), + Ipv4: pointer.String("10.53.69.61"), + }, + RemoteEndpoint: &trace.Endpoint{ + Ipv4: pointer.String("10.0.0.1"), + Port: pointer.Int32(53931), + }, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Debug: nil, + Shared: nil, + Annotations: []*trace.Annotation{}, + Tags: map[string]string{ + "hostname": "api246-sjc1", + "jaeger.version": "Python-3.1.0", + }, + }, + { + TraceID: "c2969a8955571a3f", + ParentID: pointer.String("000000000068c4e3"), + ID: "0031d092272e", + Name: pointer.String("post"), + Kind: &ConsumerKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api1"), + Ipv4: pointer.String("10.53.69.61"), + }, + RemoteEndpoint: nil, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Debug: nil, + Shared: nil, + Annotations: []*trace.Annotation{}, + Tags: map[string]string{ + "hostname": "api246-sjc1", + "jaeger.version": "Python-3.1.0", + }, + }, + { + TraceID: "d2969a8955571a3f", + ParentID: pointer.String("000000000068c4e3"), + ID: "A8DE7706B08C3A", + Name: pointer.String("post"), + Kind: &ProducerKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api1"), + Ipv4: pointer.String("10.53.69.61"), + }, + RemoteEndpoint: &trace.Endpoint{ + Ipv6: pointer.String("::1"), + }, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Debug: pointer.Bool(true), + Shared: nil, + Annotations: []*trace.Annotation{}, + Tags: map[string]string{ + "hostname": "api246-sjc1", + "jaeger.version": "Python-3.1.0", + }, + }, + { + TraceID: "e2969a8955571a3f", + ParentID: pointer.String("000000000078c4e3"), + ID: "B8DE7706B08C3A", + Name: pointer.String("post"), + Kind: &ProducerKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api1"), + Ipv4: pointer.String("10.53.69.61"), + }, + RemoteEndpoint: &trace.Endpoint{ + Ipv6: pointer.String("::1"), + }, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Debug: pointer.Bool(true), + Shared: nil, + Annotations: []*trace.Annotation{}, + Tags: map[string]string{ + "elements": "100", + "hostname": "api246-sjc1", + "jaeger.version": "Python-3.1.0", + }, + }, + { + TraceID: "f2969a8955671a3f", + ParentID: pointer.String("000000000035c4e2"), + ID: "CFEC5BE6328C3A", + Name: pointer.String("post"), + Kind: &ProducerKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api2"), + Ipv4: pointer.String("10.53.69.70"), + }, + RemoteEndpoint: &trace.Endpoint{ + Ipv6: pointer.String("::1"), + }, + Debug: pointer.Bool(true), + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Tags: map[string]string{ + "hostname": "api2-233", + "jaeger.version": "Python-3.6.0", + }, + }, + { + TraceID: "fa281a8955571a3a", + ID: "DFEC5BE6328C3A", + Name: pointer.String("get"), + Kind: &ClientKind, + LocalEndpoint: &trace.Endpoint{ + ServiceName: pointer.String("api3"), + Ipv4: pointer.String("10.53.67.53"), + }, + RemoteEndpoint: &trace.Endpoint{ + Ipv6: pointer.String("::1"), + }, + Timestamp: pointer.Int64(1485467191639875), + Duration: pointer.Int64(22938), + Tags: map[string]string{ + "hostname": "api3-sjc1", + "jaeger.version": "Python-3.6.0", + }, + }, +}