Skip to content

Commit

Permalink
fix:修复监控数据上报丢失方法
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Jul 19, 2024
1 parent babd816 commit 02a6e3f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 25 deletions.
3 changes: 3 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
subSc, ok := pnp.readySCs[addr]
if ok {
reporter := &resultReporter{
method: info.FullMethodName,
instance: targetInstance,
consumerAPI: pnp.balancer.consumerAPI,
startTime: time.Now(),
Expand Down Expand Up @@ -543,6 +544,7 @@ func collectRouteLabels(routings []*traffic_manage.Route) []string {
}

type resultReporter struct {
method string
instance model.Instance
consumerAPI polaris.ConsumerAPI
startTime time.Time
Expand All @@ -559,6 +561,7 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
callResult.CalledInstance = r.instance
callResult.RetStatus = retStatus
callResult.SourceService = r.sourceService
callResult.SetMethod(r.method)
callResult.SetDelay(time.Since(r.startTime))
callResult.SetRetCode(int32(code))
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
Expand Down
14 changes: 5 additions & 9 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,22 @@ import (
)

const (
listenPort = 16011
listenPort = 18080
)

func main() {
// grpc客户端连接获取
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

polaris.GetLogger().SetLevel(polaris.LogDebug)

conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC",
polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
polaris.WithDisableRouter(),
polaris.WithDisableCircuitBreaker(),
)

conn.Close()

if err != nil {
log.Fatal(err)
}
defer conn.Close()
echoClient := pb.NewEchoServerClient(conn)

indexHandler := func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -97,10 +91,10 @@ func main() {
log.Fatal(err)
}
}()
runMainLoop()
runMainLoop(conn, cancel)
}

func runMainLoop() {
func runMainLoop(conn *grpc.ClientConn, cancel context.CancelFunc) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
syscall.SIGINT, syscall.SIGTERM,
Expand All @@ -109,6 +103,8 @@ func runMainLoop() {

for s := range ch {
log.Printf("catch signal(%+v), stop servers", s)
cancel()
conn.Close()
polaris.ClosePolarisContext()
return
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-alpha.7
github.com/polarismesh/polaris-go v1.6.0-alpha.8
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/polaris-go v1.6.0-alpha.7 h1:cWyx5BC6+JurBNCV3EaeCjd6U8edY6KrG6HkkBhWMR0=
github.com/polarismesh/polaris-go v1.6.0-alpha.7/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/polaris-go v1.6.0-alpha.8 h1:KzANbn7gumZLfbJEA1KavDiFBqlDKxeMVS3eTxZXFR0=
github.com/polarismesh/polaris-go v1.6.0-alpha.8/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I=
github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
Expand Down
75 changes: 63 additions & 12 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package grpcpolaris

import (
"fmt"
"log"
"runtime"
"sync/atomic"
"time"

"github.com/natefinch/lumberjack"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type LogLevel int
Expand All @@ -35,6 +38,21 @@ const (
LogError
)

func (l LogLevel) String() string {
switch l {
case LogDebug:
return "[debug]"
case LogInfo:
return "[info]"
case LogWarn:
return "[warn]"
case LogError:
return "[error]"
default:
return ""
}
}

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {
Expand All @@ -54,25 +72,35 @@ type Logger interface {
}

type defaultLogger struct {
writer *log.Logger
writer zapcore.Core
levelRef atomic.Value
}

func newDefaultLogger() *defaultLogger {
lumberJackLogger := &lumberjack.Logger{
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
w := zapcore.AddSync(&lumberjack.Logger{
Filename: "./logs/grpc-go-polaris.log", // 文件位置
MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: 7, // 保留旧文件的最大天数
MaxBackups: 100, // 保留旧文件的最大个数
Compress: true, // 是否压缩/归档旧文件
}
})

levelRef := atomic.Value{}

levelRef.Store(LogInfo)
core := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderCfg), w, zap.InfoLevel)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Lshortfile|log.Ldate|log.Ltime),
levelRef: levelRef,
writer: core,
}
}

Expand All @@ -98,9 +126,32 @@ func (l *defaultLogger) Error(format string, args ...interface{}) {
}

func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) {
curLevel := l.levelRef.Load().(LogLevel)
if curLevel > expectLevel {
zapL := func() zapcore.Level {
switch expectLevel {
case LogDebug:
return zapcore.DebugLevel
case LogInfo:
return zapcore.InfoLevel
case LogWarn:
return zapcore.WarnLevel
case LogError:
return zapcore.ErrorLevel
default:
return zapcore.InfoLevel
}
}()

if !l.writer.Enabled(zapL) {
return
}
_ = l.writer.Output(3, fmt.Sprintf(format, args...))

msg := fmt.Sprintf(format, args...)
e := zapcore.Entry{
Message: msg,
Level: zapL,
Time: time.Now(),
}

e.Caller = zapcore.NewEntryCaller(runtime.Caller(2))
_ = l.writer.Write(e, nil)
}
2 changes: 1 addition & 1 deletion resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (pr *polarisNamingResolver) watcher() {
for {
select {
case <-pr.ctx.Done():
GetLogger().Info("[Polaris][Resolver] exist watch instance change event for namespace=%s service=%s: %v",
GetLogger().Info("[Polaris][Resolver] exit watch instance change event for namespace=%s service=%s: %v",
pr.options.Namespace, pr.host)
return
case <-pr.eventCh:
Expand Down

0 comments on commit 02a6e3f

Please sign in to comment.