From 6b214bd54439240626ee7ed1b32b256e9a79ea3c Mon Sep 17 00:00:00 2001 From: zhengwei Date: Wed, 3 Feb 2021 19:38:40 +0800 Subject: [PATCH 1/4] feat: add prometheus metric && support send logs for concurrency && add fallback logger --- core/elog/ali/log_project.go | 20 ++----- core/elog/ali/option.go | 6 ++ core/elog/ali/request.go | 4 -- core/elog/ali/writer.go | 104 +++++++++++++++++++------------- core/elog/component.go | 108 +++++++++++++++++++--------------- core/elog/config.go | 22 ++++--- core/emetric/metric.go | 10 +--- core/emetric/summary.go | 20 ++++--- server/egovernor/component.go | 5 ++ 9 files changed, 166 insertions(+), 133 deletions(-) diff --git a/core/elog/ali/log_project.go b/core/elog/ali/log_project.go index 67a21a18..23a53eef 100755 --- a/core/elog/ali/log_project.go +++ b/core/elog/ali/log_project.go @@ -2,11 +2,12 @@ package ali import ( "encoding/json" + "errors" "fmt" "net/http" - "net/url" "strconv" "strings" + "time" "github.com/go-resty/resty/v2" "github.com/golang/protobuf/proto" @@ -80,7 +81,7 @@ func (p *LogProject) ListLogStore() (storeNames []string, err error) { return } -func (p *LogProject) parseEndpoint() { +func (p *LogProject) initHost() { scheme := httpScheme // default to http scheme host := p.endpoint @@ -91,18 +92,6 @@ func (p *LogProject) parseEndpoint() { host = strings.TrimPrefix(p.endpoint, scheme) } - if ipRegex.MatchString(host) { - // use direct ip proxy - u, err := url.Parse(fmt.Sprintf("%s%s", scheme, host)) - if err != nil { - return - } - cli := p.cli.GetClient() - cli.Transport = &http.Transport{ - Proxy: http.ProxyURL(u), - } - p.cli = resty.NewWithClient(cli) - } if p.name == "" { p.host = fmt.Sprintf("%s%s", scheme, host) } else { @@ -137,6 +126,9 @@ func (p *LogProject) GetLogStore(name string) (s *LogStore, err error) { // PutLogs puts logs into logstore. // The callers should transform user logs into LogGroup. func (s *LogStore) PutLogs(lg *pb.LogGroup) (err error) { + time.Sleep(50 * time.Millisecond) + return errors.New("mock") + body, err := proto.Marshal(lg) if err != nil { return diff --git a/core/elog/ali/option.go b/core/elog/ali/option.go index 53e467e1..b7e84a2f 100644 --- a/core/elog/ali/option.go +++ b/core/elog/ali/option.go @@ -91,3 +91,9 @@ func WithApiRetryMaxWaitTime(apiRetryMaxWaitTime time.Duration) Option { c.apiRetryMaxWaitTime = apiRetryMaxWaitTime } } + +func WithFallbackCore(core zapcore.Core) Option { + return func(c *config) { + c.fallbackCore = core + } +} diff --git a/core/elog/ali/request.go b/core/elog/ali/request.go index 2ea06d4e..c7141624 100755 --- a/core/elog/ali/request.go +++ b/core/elog/ali/request.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "net/http" - "regexp" "github.com/go-resty/resty/v2" ) @@ -15,11 +14,8 @@ const requestIDHeader = "x-log-requestid" const ( httpScheme = "http://" httpsScheme = "https://" - ipRegexStr = `\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}.*` ) -var ipRegex = regexp.MustCompile(ipRegexStr) - type badResError struct { body string header map[string][]string diff --git a/core/elog/ali/writer.go b/core/elog/ali/writer.go index 2bec5383..df9e1c83 100644 --- a/core/elog/ali/writer.go +++ b/core/elog/ali/writer.go @@ -4,21 +4,26 @@ import ( "context" "fmt" "log" + "math" "sync/atomic" "time" "github.com/go-resty/resty/v2" "github.com/golang/protobuf/proto" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/gotomicro/ego/core/eapp" "github.com/gotomicro/ego/core/elog/ali/pb" + "github.com/gotomicro/ego/core/emetric" "github.com/gotomicro/ego/core/util/xcast" ) const ( - // flushSize sets the flush size - flushSize int = 128 + // entryChanSize sets the logs size + entryChanSize int = 4096 + // observe interval + observeInterval = 5 * time.Second ) type LogContent = pb.Log_Content @@ -42,17 +47,17 @@ type config struct { apiRetryCount int apiRetryWaitTime time.Duration apiRetryMaxWaitTime time.Duration + fallbackCore zapcore.Core } // writer implements LoggerInterface. // Writes messages in keep-live tcp connection. type writer struct { - store *LogStore - group []*pb.LogGroup - withMap bool - ch chan *pb.Log - curBufSize *int32 - cancel context.CancelFunc + fallbackCore zapcore.Core + store *LogStore + ch chan *pb.Log + curBufSize *int32 + cancel context.CancelFunc config } @@ -66,14 +71,18 @@ func retryCondition(r *resty.Response, err error) bool { // newWriter creates a new ali writer func newWriter(c config) (*writer, error) { - w := &writer{config: c, ch: make(chan *pb.Log, c.apiBulkSize), curBufSize: new(int32)} + entryChanSize := entryChanSize + if c.apiBulkSize >= entryChanSize { + c.apiBulkSize = entryChanSize + } + w := &writer{config: c, ch: make(chan *pb.Log, entryChanSize), curBufSize: new(int32)} p := &LogProject{ name: w.project, endpoint: w.endpoint, accessKeyID: w.accessKeyID, accessKeySecret: w.accessKeySecret, } - p.parseEndpoint() + p.initHost() p.cli = resty.New(). SetDebug(eapp.IsDevelopmentMode()). SetHostURL(p.host). @@ -87,25 +96,9 @@ func newWriter(c config) (*writer, error) { return nil, fmt.Errorf("getlogstroe fail,%w", err) } w.store = store - - // Create default Log Group - w.group = append(w.group, &pb.LogGroup{ - Topic: proto.String(""), - Source: proto.String(w.source), - Logs: make([]*pb.Log, 0, w.apiBulkSize), - }) - - // Create other Log Group - for _, topic := range w.topics { - lg := &pb.LogGroup{ - Topic: proto.String(topic), - Source: proto.String(w.source), - Logs: make([]*pb.Log, 0, w.apiBulkSize), - } - w.group = append(w.group, lg) - } - - w.Sync() + w.fallbackCore = c.fallbackCore + w.sync() + w.observe() return w, nil } @@ -135,28 +128,50 @@ func (w *writer) write(fields map[string]interface{}) (err error) { } func (w *writer) flush() error { - // TODO sync pool - var lg = *w.group[0] - chlen := len(w.ch) - if chlen == 0 { + entriesChLen := len(w.ch) + if entriesChLen == 0 { return nil } - var logs = make([]*pb.Log, 0, chlen) - logs = append(logs, <-w.ch) + + var waitedEntries = make([]*pb.Log, 0, entriesChLen) + waitedEntries = append(waitedEntries, <-w.ch) L1: - for { + for i := 0; i < entriesChLen; i++ { select { case l := <-w.ch: - logs = append(logs, l) + waitedEntries = append(waitedEntries, l) default: break L1 } } - lg.Logs = logs - return w.store.PutLogs(&lg) + + chunks := int(math.Ceil(float64(len(waitedEntries)) / float64(w.apiBulkSize))) + for i := 0; i < chunks; i++ { + go func(start int) { + end := (start + 1) * w.apiBulkSize + if end > len(waitedEntries) { + end = len(waitedEntries) + } + lg := pb.LogGroup{Logs: waitedEntries[start:end]} + if e := w.store.PutLogs(&lg); e != nil { + // if error occurs we put logs with fallbackCore logger + for _, v := range lg.Logs { + fields := make([]zapcore.Field, len(v.Contents), len(v.Contents)) + for i, val := range v.Contents { + fields[i] = zap.String(val.GetKey(), val.GetValue()) + } + if e := w.fallbackCore.Write(zapcore.Entry{Time: time.Now()}, fields); e != nil { + log.Println("fallbackCore write fail", e) + } + } + } + }(i) + } + + return nil } -func (w *writer) Sync() { +func (w *writer) sync() { ctx, cancel := context.WithCancel(context.Background()) w.cancel = cancel ticker := time.NewTicker(w.flushBufferInterval) @@ -175,3 +190,12 @@ func (w *writer) Sync() { }() return } + +func (w *writer) observe() { + go func() { + for { + emetric.LibHandleSummary.Observe(float64(len(w.ch)), "elog", "ali_waited_entries") + time.Sleep(observeInterval) + } + }() +} diff --git a/core/elog/component.go b/core/elog/component.go index 14f8ce26..1b87fb6f 100644 --- a/core/elog/component.go +++ b/core/elog/component.go @@ -8,11 +8,12 @@ import ( "strings" "time" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/gotomicro/ego/core/econf" "github.com/gotomicro/ego/core/elog/ali" "github.com/gotomicro/ego/core/util/xcolor" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) const ( @@ -76,56 +77,72 @@ var ( ByteString = zap.ByteString ) -func newCore(config *Config, lv zap.AtomicLevel) (zapcore.Core, CloseFunc) { - core := config.core - var asyncStopFunc CloseFunc +const ( + defaultAliFallbackCorePath = "ali.log" +) - encoderConfig := *config.encoderConfig - if config.Writer == writerRotateFile { - // Debug output to console and file by default - var ws = zapcore.AddSync(newRotate(config)) - if config.Debug { - ws = zap.CombineWriteSyncers(os.Stdout, ws) - } - if config.EnableAsync { - ws, asyncStopFunc = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval) +func newRotateFileCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, asyncStopFunc CloseFunc) { + // Debug output to console and file by default + var ws = zapcore.AddSync(newRotate(config)) + if config.Debug { + ws = zap.CombineWriteSyncers(os.Stdout, ws) + } + if config.EnableAsync { + ws, asyncStopFunc = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval) + } + core = zapcore.NewCore( + func() zapcore.Encoder { + if config.Debug { + return zapcore.NewConsoleEncoder(*config.encoderConfig) + } + return zapcore.NewJSONEncoder(*config.encoderConfig) + }(), + ws, + lv, + ) + return core, asyncStopFunc +} + +func newAliCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, asyncStopFunc CloseFunc) { + c := *config + c.Name = defaultAliFallbackCorePath + fallbackCore, fallbackCoreCf := newRotateFileCore(&c, lv) + core, cf := ali.NewCore( + ali.WithEncoder(ali.NewMapObjEncoder(*config.encoderConfig)), + ali.WithEndpoint(config.AliEndpoint), + ali.WithAccessKeyID(config.AliAccessKeyID), + ali.WithAccessKeySecret(config.AliAccessKeySecret), + ali.WithProject(config.AliProject), + ali.WithLogstore(config.AliLogstore), + ali.WithLevelEnabler(lv), + ali.WithFlushBufferSize(config.FlushBufferSize), + ali.WithFlushBufferInterval(config.FlushBufferInterval), + ali.WithApiBulkSize(config.AliApiBulkSize), + ali.WithApiTimeout(config.AliApiTimeout), + ali.WithApiRetryCount(config.AliApiRetryCount), + ali.WithApiRetryWaitTime(config.AliApiRetryWaitTime), + ali.WithApiRetryMaxWaitTime(config.AliApiRetryMaxWaitTime), + ali.WithFallbackCore(fallbackCore), + ) + return core, func() error { + var err error + if e := cf(); e != nil { + err = fmt.Errorf("exec close func fail, %w ", err) } - - if core == nil { - core = zapcore.NewCore( - func() zapcore.Encoder { - if config.Debug { - return zapcore.NewConsoleEncoder(encoderConfig) - } - return zapcore.NewJSONEncoder(encoderConfig) - }(), - ws, - lv, - ) + if err := fallbackCoreCf(); err != nil { + err = fmt.Errorf("exec fallbackCore close func fail, %w", err) } - return core, asyncStopFunc + return err } +} +func newCore(config *Config, lv zap.AtomicLevel) (zapcore.Core, CloseFunc) { + if config.Writer == writerRotateFile { + return newRotateFileCore(config, lv) + } if config.Writer == writerAliSLS { - core, asyncStopFunc = ali.NewCore( - ali.WithEncoder(ali.NewMapObjEncoder(encoderConfig)), - ali.WithEndpoint(config.AliEndpoint), - ali.WithAccessKeyID(config.AliAccessKeyID), - ali.WithAccessKeySecret(config.AliAccessKeySecret), - ali.WithProject(config.AliProject), - ali.WithLogstore(config.AliLogstore), - ali.WithLevelEnabler(lv), - ali.WithFlushBufferSize(config.FlushBufferSize), - ali.WithFlushBufferInterval(config.FlushBufferInterval), - ali.WithApiBulkSize(config.AliApiBulkSize), - ali.WithApiTimeout(config.AliApiTimeout), - ali.WithApiRetryCount(config.AliApiRetryCount), - ali.WithApiRetryWaitTime(config.AliApiRetryWaitTime), - ali.WithApiRetryMaxWaitTime(config.AliApiRetryMaxWaitTime), - ) - return core, nil + return newAliCore(config, lv) } - return nil, nil } @@ -436,7 +453,6 @@ func panicDetail(msg string, fields ...Field) { for key, val := range enc.Fields { fmt.Printf(" %s: %s\n", xcolor.Red(key), fmt.Sprintf("%+v", val)) } - } // With ... diff --git a/core/elog/config.go b/core/elog/config.go index 1bf58c8f..56fd773e 100644 --- a/core/elog/config.go +++ b/core/elog/config.go @@ -25,21 +25,19 @@ type Config struct { FlushBufferSize int // 缓冲大小,默认256 * 1024B FlushBufferInterval time.Duration // 缓冲时间,默认5秒 Writer string // 使用哪种Writer,默认使用fileWriter - AliAccessKeyID string // [aliWriter]阿里云sls AKID - AliAccessKeySecret string // [aliWriter]阿里云sls AKSecret - AliEndpoint string // [aliWriter]阿里云sls endpoint - AliProject string // [aliWriter]阿里云sls Project名称 - AliLogstore string // [aliWriter]阿里云sls logstore名称 - AliTopic string // [aliWriter]阿里云sls logstore名称 - AliApiBulkSize int // [aliWriter]阿里云sls API单次请求发送最大日志条数 - AliApiTimeout time.Duration // [aliWriter]阿里云sls API接口超时 - AliApiRetryCount int // [aliWriter]阿里云sls API接口重试次数 - AliApiRetryWaitTime time.Duration // [aliWriter]阿里云sls API接口重试默认等待间隔 - AliApiRetryMaxWaitTime time.Duration // [aliWriter]阿里云sls API接口重试最大等待间隔 + AliAccessKeyID string // [aliWriter]阿里云sls AKID,必填 + AliAccessKeySecret string // [aliWriter]阿里云sls AKSecret,必填 + AliEndpoint string // [aliWriter]阿里云sls endpoint,必填 + AliProject string // [aliWriter]阿里云sls Project名称,必填 + AliLogstore string // [aliWriter]阿里云sls logstore名称,必填 + AliApiBulkSize int // [aliWriter]阿里云sls API单次请求发送最大日志条数,默认256条 + AliApiTimeout time.Duration // [aliWriter]阿里云sls API接口超时,默认3秒 + AliApiRetryCount int // [aliWriter]阿里云sls API接口重试次数,默认3次 + AliApiRetryWaitTime time.Duration // [aliWriter]阿里云sls API接口重试默认等待间隔,默认1秒 + AliApiRetryMaxWaitTime time.Duration // [aliWriter]阿里云sls API接口重试最大等待间隔,默认3秒 fields []zap.Field // 日志初始化字段 callerSkip int - core zapcore.Core encoderConfig *zapcore.EncoderConfig configKey string } diff --git a/core/emetric/metric.go b/core/emetric/metric.go index ef2413d1..cbfa953b 100644 --- a/core/emetric/metric.go +++ b/core/emetric/metric.go @@ -1,11 +1,9 @@ package emetric import ( - "github.com/gotomicro/ego/core/eapp" - "github.com/gotomicro/ego/server/egovernor" - "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" "time" + + "github.com/gotomicro/ego/core/eapp" ) var ( @@ -137,8 +135,4 @@ func init() { eapp.BuildTime(), eapp.GoVersion(), ).Set(float64(time.Now().UnixNano() / 1e6)) - - egovernor.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - promhttp.Handler().ServeHTTP(w, r) - }) } diff --git a/core/emetric/summary.go b/core/emetric/summary.go index cd360f0a..57b3f816 100644 --- a/core/emetric/summary.go +++ b/core/emetric/summary.go @@ -4,11 +4,12 @@ import "github.com/prometheus/client_golang/prometheus" // SummaryVecOpts ... type SummaryVecOpts struct { - Namespace string - Subsystem string - Name string - Help string - Labels []string + Namespace string + Subsystem string + Name string + Help string + Objectives map[float64]float64 + Labels []string } type summaryVec struct { @@ -19,10 +20,11 @@ type summaryVec struct { func (opts SummaryVecOpts) Build() *summaryVec { vec := prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Namespace: opts.Namespace, - Subsystem: opts.Subsystem, - Name: opts.Name, - Help: opts.Help, + Namespace: opts.Namespace, + Subsystem: opts.Subsystem, + Name: opts.Name, + Help: opts.Help, + Objectives: opts.Objectives, }, opts.Labels) prometheus.MustRegister(vec) return &summaryVec{ diff --git a/server/egovernor/component.go b/server/egovernor/component.go index b7e21276..8ccb6e25 100644 --- a/server/egovernor/component.go +++ b/server/egovernor/component.go @@ -9,6 +9,8 @@ import ( "github.com/gotomicro/ego/core/elog" "github.com/gotomicro/ego/server" jsoniter "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "net/http" "net/http/pprof" @@ -115,6 +117,9 @@ func (c *Component) Init() error { //Serve .. func (s *Component) Start() error { + HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + promhttp.Handler().ServeHTTP(w, r) + }) err := s.Server.Serve(s.listener) if err == http.ErrServerClosed { return nil From 420de944fbf9466f97185cebc175edb2490410d8 Mon Sep 17 00:00:00 2001 From: zhengwei Date: Wed, 3 Feb 2021 19:40:30 +0800 Subject: [PATCH 2/4] feat: logger - remove useless code --- core/elog/ali/log_project.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/elog/ali/log_project.go b/core/elog/ali/log_project.go index 23a53eef..ef2df536 100755 --- a/core/elog/ali/log_project.go +++ b/core/elog/ali/log_project.go @@ -2,12 +2,10 @@ package ali import ( "encoding/json" - "errors" "fmt" "net/http" "strconv" "strings" - "time" "github.com/go-resty/resty/v2" "github.com/golang/protobuf/proto" @@ -126,9 +124,6 @@ func (p *LogProject) GetLogStore(name string) (s *LogStore, err error) { // PutLogs puts logs into logstore. // The callers should transform user logs into LogGroup. func (s *LogStore) PutLogs(lg *pb.LogGroup) (err error) { - time.Sleep(50 * time.Millisecond) - return errors.New("mock") - body, err := proto.Marshal(lg) if err != nil { return From d558e9eaa79f9a71b3d41f127de1734886d9c182 Mon Sep 17 00:00:00 2001 From: zhengwei Date: Thu, 4 Feb 2021 10:41:19 +0800 Subject: [PATCH 3/4] fix: fix chan race && optimize fallback code --- core/elog/ali/writer.go | 42 +++++++++++++++++++++-------------------- core/elog/component.go | 23 +++++++++++----------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/core/elog/ali/writer.go b/core/elog/ali/writer.go index df9e1c83..b4abe0a6 100644 --- a/core/elog/ali/writer.go +++ b/core/elog/ali/writer.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "math" + "sync" "sync/atomic" "time" @@ -56,6 +57,7 @@ type writer struct { fallbackCore zapcore.Core store *LogStore ch chan *pb.Log + lock sync.Mutex curBufSize *int32 cancel context.CancelFunc config @@ -120,7 +122,8 @@ func (w *writer) write(fields map[string]interface{}) (err error) { l := genLog(fields) // if bufferSize bigger then defaultBufferSize or channel is full, then flush logs w.ch <- l - atomic.AddInt32(w.curBufSize, int32(l.XXX_Size())) + atomic.AddInt32(w.curBu{"lv":"info","ts":1612351917,"msg":"","lv":"info","ts":"1612351912","msg":"oapms-be test","index":"5"} + fSize, int32(l.XXX_Size())) if atomic.LoadInt32(w.curBufSize) >= w.flushBufferSize || len(w.ch) >= cap(w.ch) { err = w.flush() } @@ -128,22 +131,17 @@ func (w *writer) write(fields map[string]interface{}) (err error) { } func (w *writer) flush() error { + w.lock.Lock() entriesChLen := len(w.ch) if entriesChLen == 0 { + w.lock.Unlock() return nil } - var waitedEntries = make([]*pb.Log, 0, entriesChLen) - waitedEntries = append(waitedEntries, <-w.ch) -L1: for i := 0; i < entriesChLen; i++ { - select { - case l := <-w.ch: - waitedEntries = append(waitedEntries, l) - default: - break L1 - } + waitedEntries = append(waitedEntries, <-w.ch) } + w.lock.Unlock() chunks := int(math.Ceil(float64(len(waitedEntries)) / float64(w.apiBulkSize))) for i := 0; i < chunks; i++ { @@ -154,16 +152,8 @@ L1: } lg := pb.LogGroup{Logs: waitedEntries[start:end]} if e := w.store.PutLogs(&lg); e != nil { - // if error occurs we put logs with fallbackCore logger - for _, v := range lg.Logs { - fields := make([]zapcore.Field, len(v.Contents), len(v.Contents)) - for i, val := range v.Contents { - fields[i] = zap.String(val.GetKey(), val.GetValue()) - } - if e := w.fallbackCore.Write(zapcore.Entry{Time: time.Now()}, fields); e != nil { - log.Println("fallbackCore write fail", e) - } - } + // if error occurs we put logs to fallback logger + w.writeToFallbackLogger(lg) } }(i) } @@ -171,6 +161,18 @@ L1: return nil } +func (w *writer) writeToFallbackLogger(lg pb.LogGroup) { + for _, v := range lg.Logs { + fields := make([]zapcore.Field, len(v.Contents), len(v.Contents)) + for i, val := range v.Contents { + fields[i] = zap.String(val.GetKey(), val.GetValue()) + } + if e := w.fallbackCore.Write(zapcore.Entry{Time: time.Now()}, fields); e != nil { + log.Println("fallbackCore write fail", e) + } + } +} + func (w *writer) sync() { ctx, cancel := context.WithCancel(context.Background()) w.cancel = cancel diff --git a/core/elog/component.go b/core/elog/component.go index 1b87fb6f..1243526f 100644 --- a/core/elog/component.go +++ b/core/elog/component.go @@ -81,14 +81,15 @@ const ( defaultAliFallbackCorePath = "ali.log" ) -func newRotateFileCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, asyncStopFunc CloseFunc) { +// newRotateFileCore construct a rotate file zapcore.Core +func newRotateFileCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, cf CloseFunc) { // Debug output to console and file by default var ws = zapcore.AddSync(newRotate(config)) if config.Debug { ws = zap.CombineWriteSyncers(os.Stdout, ws) } if config.EnableAsync { - ws, asyncStopFunc = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval) + ws, cf = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval) } core = zapcore.NewCore( func() zapcore.Encoder { @@ -100,14 +101,15 @@ func newRotateFileCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, a ws, lv, ) - return core, asyncStopFunc + return core, cf } -func newAliCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, asyncStopFunc CloseFunc) { +// newAliCore construct a ali SLS zapcore.Core +func newAliCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, cf CloseFunc) { c := *config c.Name = defaultAliFallbackCorePath fallbackCore, fallbackCoreCf := newRotateFileCore(&c, lv) - core, cf := ali.NewCore( + core, cf = ali.NewCore( ali.WithEncoder(ali.NewMapObjEncoder(*config.encoderConfig)), ali.WithEndpoint(config.AliEndpoint), ali.WithAccessKeyID(config.AliAccessKeyID), @@ -124,15 +126,14 @@ func newAliCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, asyncSto ali.WithApiRetryMaxWaitTime(config.AliApiRetryMaxWaitTime), ali.WithFallbackCore(fallbackCore), ) - return core, func() error { - var err error + return core, func() (err error) { if e := cf(); e != nil { - err = fmt.Errorf("exec close func fail, %w ", err) + err = fmt.Errorf("exec close func fail, %w ", e) } - if err := fallbackCoreCf(); err != nil { - err = fmt.Errorf("exec fallbackCore close func fail, %w", err) + if e := fallbackCoreCf(); e != nil { + err = fmt.Errorf("exec fallbackCore close func fail, %w", e) } - return err + return } } From 9a39cd237a1100445e13e1e76b6e5ee76fc41d79 Mon Sep 17 00:00:00 2001 From: zhengwei Date: Thu, 4 Feb 2021 10:46:50 +0800 Subject: [PATCH 4/4] fix: write fix --- core/elog/ali/writer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/elog/ali/writer.go b/core/elog/ali/writer.go index b4abe0a6..98a73fd5 100644 --- a/core/elog/ali/writer.go +++ b/core/elog/ali/writer.go @@ -122,8 +122,7 @@ func (w *writer) write(fields map[string]interface{}) (err error) { l := genLog(fields) // if bufferSize bigger then defaultBufferSize or channel is full, then flush logs w.ch <- l - atomic.AddInt32(w.curBu{"lv":"info","ts":1612351917,"msg":"","lv":"info","ts":"1612351912","msg":"oapms-be test","index":"5"} - fSize, int32(l.XXX_Size())) + atomic.AddInt32(w.curBufSize, int32(l.XXX_Size())) if atomic.LoadInt32(w.curBufSize) >= w.flushBufferSize || len(w.ch) >= cap(w.ch) { err = w.flush() }