-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathagent.go
277 lines (225 loc) · 7.01 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
// (c) Copyright IBM Corp. 2021
// (c) Copyright Instana Inc. 2016
package instana
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/instana/go-sensor/acceptor"
"github.com/instana/go-sensor/autoprofile"
)
var payloadTooLargeErr = errors.New(`request payload is too large`)
const (
agentDiscoveryURL = "/com.instana.plugin.golang.discovery"
agentTracesURL = "/com.instana.plugin.golang/traces."
agentDataURL = "/com.instana.plugin.golang."
agentEventURL = "/com.instana.plugin.generic.event"
agentProfilesURL = "/com.instana.plugin.golang/profiles."
agentDefaultHost = "localhost"
agentDefaultPort = 42699
agentHeader = "Instana Agent"
// SnapshotPeriod is the amount of time in seconds between snapshot reports.
SnapshotPeriod = 600
snapshotCollectionInterval = SnapshotPeriod * time.Second
announceTimeout = 15 * time.Second
clientTimeout = 5 * time.Second
maxContentLength = 1024 * 1024 * 5
numberOfBigSpansToLog = 5
)
type agentResponse struct {
Pid uint32 `json:"pid"`
HostID string `json:"agentUuid"`
Secrets struct {
Matcher string `json:"matcher"`
List []string `json:"list"`
} `json:"secrets"`
ExtraHTTPHeaders []string `json:"extraHeaders"`
Tracing struct {
ExtraHTTPHeaders []string `json:"extra-http-headers"`
} `json:"tracing"`
}
func (a *agentResponse) getExtraHTTPHeaders() []string {
if len(a.Tracing.ExtraHTTPHeaders) == 0 {
return a.ExtraHTTPHeaders
}
return a.Tracing.ExtraHTTPHeaders
}
type discoveryS struct {
PID int `json:"pid"`
Name string `json:"name"`
Args []string `json:"args"`
Fd string `json:"fd"`
Inode string `json:"inode"`
CPUSetFileContent string `json:"cpuSetFileContent"`
}
type fromS struct {
EntityID string `json:"e"`
// Serverless agents fields
Hostless bool `json:"hl,omitempty"`
CloudProvider string `json:"cp,omitempty"`
// Host agent fields
HostID string `json:"h,omitempty"`
}
func newServerlessAgentFromS(entityID, provider string) *fromS {
return &fromS{
EntityID: entityID,
Hostless: true,
CloudProvider: provider,
}
}
type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
type agentS struct {
// agentComm encapsulates info about the agent host and fromS. This is a shared information between the agent and
// the fsm layer, so we use this wrapper to prevent passing data from one side to the other in a more sophisticated
// way.
agentComm *agentCommunicator
port string
mu sync.RWMutex
fsm *fsmS
snapshot *SnapshotCollector
logger LeveledLogger
printPayloadTooLargeErrInfoOnce sync.Once
}
func newAgent(serviceName, host string, port int, logger LeveledLogger) *agentS {
if logger == nil {
logger = defaultLogger
}
logger.Debug("initializing agent")
agent := &agentS{
agentComm: newAgentCommunicator(host, strconv.Itoa(port), &fromS{}, logger),
port: strconv.Itoa(port),
snapshot: &SnapshotCollector{
CollectionInterval: snapshotCollectionInterval,
ServiceName: serviceName,
},
logger: logger,
}
agent.mu.Lock()
agent.fsm = newFSM(agent.agentComm, logger)
agent.mu.Unlock()
return agent
}
// Ready returns whether the agent has finished the announcement and is ready to send data
func (agent *agentS) Ready() bool {
agent.mu.RLock()
defer agent.mu.RUnlock()
return agent.fsm.fsm.Current() == "ready"
}
// SendMetrics sends collected entity data to the host agent
func (agent *agentS) SendMetrics(data acceptor.Metrics) error {
pid, err := strconv.Atoi(agent.agentComm.from.EntityID)
if err != nil && agent.agentComm.from.EntityID != "" {
agent.logger.Debug("agent got malformed PID %q", agent.agentComm.from.EntityID)
}
if err := agent.agentComm.sendDataToAgent(agentDataURL, acceptor.GoProcessData{
PID: pid,
Snapshot: agent.snapshot.Collect(),
Metrics: data,
}); err != nil {
if err == payloadTooLargeErr {
agent.logger.Warn(`A batch of spans has been rejected because it is too large to be sent to the agent.`)
}
agent.logger.Error("failed to send metrics to the host agent: ", err)
agent.reset()
return err
}
return nil
}
// SendEvent sends an event using Instana Events API
func (agent *agentS) SendEvent(event *EventData) error {
err := agent.agentComm.sendDataToAgent(agentEventURL, event)
if err != nil {
if err == payloadTooLargeErr {
agent.logger.Warn(`A batch of spans has been rejected because it is too large to be sent to the agent.`)
}
// do not reset the agent as it might be not initialized at this state yet
agent.logger.Warn("failed to send event ", event.Title, " to the host agent: ", err)
return err
}
return nil
}
// SendSpans sends collected spans to the host agent
func (agent *agentS) SendSpans(spans []Span) error {
for i := range spans {
spans[i].From = agent.agentComm.from
}
err := agent.agentComm.sendDataToAgent(agentTracesURL, spans)
if err != nil {
if err == payloadTooLargeErr {
agent.printPayloadTooLargeErrInfoOnce.Do(
func() {
agent.logDetailedInformationAboutDroppedSpans(numberOfBigSpansToLog, spans, err)
},
)
return nil
} else {
agent.logger.Error("failed to send spans to the host agent: ", err)
agent.reset()
}
return err
}
return nil
}
// Flush is a noop for host agent
func (agent *agentS) Flush(ctx context.Context) error { return nil }
type hostAgentProfile struct {
autoprofile.Profile
ProcessID string `json:"pid"`
}
// SendProfiles sends profile data to the agent
func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error {
agentProfiles := make([]hostAgentProfile, 0, len(profiles))
for _, p := range profiles {
agentProfiles = append(agentProfiles, hostAgentProfile{p, agent.agentComm.from.EntityID})
}
err := agent.agentComm.sendDataToAgent(agentProfilesURL, agentProfiles)
if err != nil {
if err == payloadTooLargeErr {
agent.logger.Warn(`A batch of spans has been rejected because it is too large to be sent to the agent.`)
}
agent.logger.Error("failed to send profile data to the host agent: ", err)
agent.reset()
return err
}
return nil
}
func (agent *agentS) setLogger(l LeveledLogger) {
agent.logger = l
}
func (agent *agentS) reset() {
agent.mu.Lock()
agent.fsm.reset()
agent.mu.Unlock()
}
func (agent *agentS) logDetailedInformationAboutDroppedSpans(size int, spans []Span, err error) {
var marshaledSpans []string
for i := range spans {
ms, err := json.Marshal(spans[i])
if err == nil {
marshaledSpans = append(marshaledSpans, string(ms))
}
}
sort.Slice(marshaledSpans, func(i, j int) bool {
// descending order
return len(marshaledSpans[i]) > len(marshaledSpans[j])
})
if size > len(marshaledSpans) {
size = len(marshaledSpans)
}
agent.logger.Warn(
fmt.Sprintf("failed to send spans to the host agent: dropped %d span(s) : %s.\nThis detailed information will only be logged once.\nSpans :\n %s",
len(spans),
err.Error(),
strings.Join(marshaledSpans[:size], ";"),
),
)
}