Skip to content

Commit

Permalink
Merge pull request #20 from instana/issue_20
Browse files Browse the repository at this point in the history
OT spans should be reported using the sdk span type
  • Loading branch information
pglombardo authored Apr 18, 2017
2 parents 48d8f2e + f2f64f5 commit 78b5bce
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.DS_Store
debug.test
5 changes: 3 additions & 2 deletions custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)

type CustomData struct {
Tags ot.Tags `json:"tags,omitempty"`
Logs map[uint64]map[string]interface{} `json:"logs,omitempty"`
Tags ot.Tags `json:"tags,omitempty"`
Logs map[uint64]map[string]interface{} `json:"logs,omitempty"`
Baggage map[string]string `json:"baggage,omitempty"`
}
7 changes: 2 additions & 5 deletions data.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package instana

type Data struct {
Service string `json:"service"`
HTTP *HTTPData `json:"http,omitempty"`
RPC *RPCData `json:"rpc,omitempty"`
Baggage map[string]string `json:"baggage,omitempty"`
Custom *CustomData `json:"custom,omitempty"`
Service string `json:"service,omitempty"`
SDK *SDKData `json:"sdk"`
}
119 changes: 71 additions & 48 deletions recorder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package instana

import (
"fmt"
"os"
"sync"
"time"
Expand All @@ -11,7 +12,8 @@ import (

type SpanRecorder struct {
sync.RWMutex
spans []Span
spans []Span
testMode bool
}

type Span struct {
Expand All @@ -25,15 +27,36 @@ type Span struct {
Data interface{} `json:"data"`
}

// NewRecorder Establish a new span recorder
func NewRecorder() *SpanRecorder {
r := new(SpanRecorder)
r.init()
return r
}

// NewTestRecorder Establish a new span recorder used for testing
func NewTestRecorder() *SpanRecorder {
r := new(SpanRecorder)
r.testMode = true
r.init()
return r
}

// GetSpans returns a copy of the array of spans accumulated so far.
func (r *SpanRecorder) GetSpans() []Span {
r.RLock()
defer r.RUnlock()
spans := make([]Span, len(r.spans))
copy(spans, r.spans)
return spans
}

func getTag(rawSpan basictracer.RawSpan, tag string) interface{} {
return rawSpan.Tags[tag]
var x, ok = rawSpan.Tags[tag]
if !ok {
x = ""
}
return x
}

func getIntTag(rawSpan basictracer.RawSpan, tag string) int {
Expand All @@ -51,12 +74,11 @@ func getIntTag(rawSpan basictracer.RawSpan, tag string) int {
}

func getStringTag(rawSpan basictracer.RawSpan, tag string) string {
d := getTag(rawSpan, tag)
d := rawSpan.Tags[tag]
if d == nil {
return ""
}

return d.(string)
return fmt.Sprint(d)
}

func getHostName(rawSpan basictracer.RawSpan) string {
Expand All @@ -74,24 +96,35 @@ func getHostName(rawSpan basictracer.RawSpan) string {
}

func getServiceName(rawSpan basictracer.RawSpan) string {
s := getStringTag(rawSpan, string(ext.Component))
if s == "" {
s = getStringTag(rawSpan, string(ext.PeerService))
if s == "" {
return sensor.serviceName
// ServiceName can be determined from multiple sources and has
// the following priority (preferred first):
// 1. If added to the span via the OT component tag
// 2. If added to the span via the OT http.url tag
// 3. Specified in the tracer instantiation via Service option
component := getStringTag(rawSpan, string(ext.Component))

if len(component) > 0 {
return component
} else if len(component) == 0 {
httpURL := getStringTag(rawSpan, string(ext.HTTPUrl))

if len(httpURL) > 0 {
return httpURL
}
}

return s
return sensor.serviceName
}

func getHTTPType(rawSpan basictracer.RawSpan) string {
func getSpanKind(rawSpan basictracer.RawSpan) string {
kind := getStringTag(rawSpan, string(ext.SpanKind))
if kind == string(ext.SpanKindRPCServerEnum) {
return HTTPServer
}

return HTTPClient
switch kind {
case string(ext.SpanKindRPCServerEnum), "consumer", "entry":
return "entry"
case string(ext.SpanKindRPCClientEnum), "producer", "exit":
return "exit"
}
return ""
}

func collectLogs(rawSpan basictracer.RawSpan) map[uint64]map[string]interface{} {
Expand All @@ -111,14 +144,19 @@ func collectLogs(rawSpan basictracer.RawSpan) map[uint64]map[string]interface{}

func (r *SpanRecorder) init() {
r.reset()
ticker := time.NewTicker(1 * time.Second)
go func() {
for range ticker.C {
log.debug("Sending spans to agent", len(r.spans))

r.send()
}
}()
if r.testMode {
log.debug("Recorder in test mode. Not reporting spans to the backend.")
} else {
ticker := time.NewTicker(1 * time.Second)
go func() {
for range ticker.C {
log.debug("Sending spans to agent", len(r.spans))

r.send()
}
}()
}
}

func (r *SpanRecorder) reset() {
Expand All @@ -129,27 +167,12 @@ func (r *SpanRecorder) reset() {

func (r *SpanRecorder) RecordSpan(rawSpan basictracer.RawSpan) {
var data = &Data{}
var tp string
h := getHostName(rawSpan)
status := getIntTag(rawSpan, string(ext.HTTPStatusCode))
if status >= 0 {
tp = getHTTPType(rawSpan)
data = &Data{HTTP: &HTTPData{
Host: h,
URL: getStringTag(rawSpan, string(ext.HTTPUrl)),
Method: getStringTag(rawSpan, string(ext.HTTPMethod)),
Status: status}}
} else {
log.debug("No HTTP status code provided or invalid status code, opting out to RPC")

tp = RPC
data = &Data{RPC: &RPCData{
Host: h,
Call: rawSpan.Operation}}
}
kind := getSpanKind(rawSpan)

data.Custom = &CustomData{Tags: rawSpan.Tags,
Logs: collectLogs(rawSpan)}
data.SDK = &SDKData{
Name: rawSpan.Operation,
Type: kind,
Custom: &CustomData{Tags: rawSpan.Tags, Logs: collectLogs(rawSpan)}}

baggage := make(map[string]string)
rawSpan.Context.ForeachBaggageItem(func(k string, v string) bool {
Expand All @@ -159,7 +182,7 @@ func (r *SpanRecorder) RecordSpan(rawSpan basictracer.RawSpan) {
})

if len(baggage) > 0 {
data.Baggage = baggage
data.SDK.Custom.Baggage = baggage
}

data.Service = getServiceName(rawSpan)
Expand All @@ -184,19 +207,19 @@ func (r *SpanRecorder) RecordSpan(rawSpan basictracer.RawSpan) {
SpanID: rawSpan.Context.SpanID,
Timestamp: uint64(rawSpan.Start.UnixNano()) / uint64(time.Millisecond),
Duration: uint64(rawSpan.Duration) / uint64(time.Millisecond),
Name: tp,
Name: "sdk",
From: sensor.agent.from,
Data: &data})

if len(r.spans) == sensor.options.ForceTransmissionStartingAt {
if !r.testMode && (len(r.spans) == sensor.options.ForceTransmissionStartingAt) {
log.debug("Forcing spans to agent", len(r.spans))

r.send()
}
}

func (r *SpanRecorder) send() {
if sensor.agent.canSend() {
if sensor.agent.canSend() && !r.testMode {
go func() {
_, err := sensor.agent.request(sensor.agent.makeURL(AgentTracesURL), "POST", r.spans)

Expand Down
Loading

0 comments on commit 78b5bce

Please sign in to comment.