Skip to content

Commit

Permalink
Merge pull request #75 from flashcatcloud/panic_resolver
Browse files Browse the repository at this point in the history
resolve panic and add some debug log
  • Loading branch information
UlricQin authored Jul 8, 2022
2 parents 2318558 + e83fac0 commit 8e15bcb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion agent/metrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (a *Agent) startMetricsAgent() error {
}

reader := NewInputReader(inp)
reader.Start()
reader.Start(name)
a.InputReaders[name] = reader

log.Println("I! input:", name, "started")
Expand Down
43 changes: 27 additions & 16 deletions agent/metrics_reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package agent

import (
"fmt"
"log"
"strings"
"time"
Expand Down Expand Up @@ -33,21 +32,20 @@ func NewInputReader(in inputs.Input) *InputReader {
}
}

func (r *InputReader) Start() {
func (r *InputReader) Start(inputName string) {
// start consumer goroutines
go r.read()
go r.read(inputName)

// start collector instance
go r.startInput()
go r.startInput(inputName)
}

func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
close(r.queue)
r.input.Drop()
}

func (r *InputReader) startInput() {
func (r *InputReader) startInput(inputName string) {
interval := config.GetInterval()
if r.input.GetInterval() > 0 {
interval = time.Duration(r.input.GetInterval())
Expand All @@ -62,22 +60,30 @@ func (r *InputReader) startInput() {
select {
case <-r.quitChan:
close(r.quitChan)
close(r.queue)
return
default:
time.Sleep(interval)
r.gatherOnce()
var start time.Time
if config.Config.DebugMode {
start = time.Now()
log.Println("D!", inputName, ": before gather once")
}

r.gatherOnce(inputName)

if config.Config.DebugMode {
ms := time.Since(start).Milliseconds()
log.Println("D!", inputName, ": after gather once,", "duration:", ms, "ms")
}
}
}
}

func (r *InputReader) gatherOnce() {
func (r *InputReader) gatherOnce(inputName string) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
return
} else {
log.Println("E! gather metrics panic:", r, string(runtimex.Stack(3)))
}
log.Println("E!", inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
}
}()

Expand All @@ -88,12 +94,17 @@ func (r *InputReader) gatherOnce() {
// handle result
samples := slist.PopBackAll()

if len(samples) == 0 {
size := len(samples)
if size == 0 {
return
}

if config.Config.DebugMode {
log.Println("D!", inputName, ": gathered samples size:", size)
}

now := time.Now()
for i := 0; i < len(samples); i++ {
for i := 0; i < size; i++ {
if samples[i] == nil {
continue
}
Expand Down Expand Up @@ -139,7 +150,7 @@ func (r *InputReader) gatherOnce() {
}
}

func (r *InputReader) read() {
func (r *InputReader) read(inputName string) {
batch := config.Config.WriterOpt.Batch
if batch <= 0 {
batch = 2000
Expand Down
3 changes: 0 additions & 3 deletions writer/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package writer

import (
"fmt"
"log"
"net"
"net/http"
"sort"
Expand Down Expand Up @@ -73,8 +72,6 @@ func PostSeries(samples []*types.Sample) {
}

func printTestMetrics(samples []*types.Sample) {
log.Println(">> count:", len(samples))

for i := 0; i < len(samples); i++ {
var sb strings.Builder

Expand Down

0 comments on commit 8e15bcb

Please sign in to comment.