Skip to content

Commit

Permalink
Merge pull request #52 from gotomicro/feature/log-ali
Browse files Browse the repository at this point in the history
feat: add prometheus metric && support send logs for concurrency && a…
  • Loading branch information
askuy authored Feb 4, 2021
2 parents abfcac8 + 9a39cd2 commit 62c36fa
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 138 deletions.
15 changes: 1 addition & 14 deletions core/elog/ali/log_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -80,7 +79,7 @@ func (p *LogProject) ListLogStore() (storeNames []string, err error) {
return
}

func (p *LogProject) parseEndpoint() {
func (p *LogProject) initHost() {
scheme := httpScheme // default to http scheme
host := p.endpoint

Expand All @@ -91,18 +90,6 @@ func (p *LogProject) parseEndpoint() {
host = strings.TrimPrefix(p.endpoint, scheme)
}

if ipRegex.MatchString(host) {
// use direct ip proxy
u, err := url.Parse(fmt.Sprintf("%s%s", scheme, host))
if err != nil {
return
}
cli := p.cli.GetClient()
cli.Transport = &http.Transport{
Proxy: http.ProxyURL(u),
}
p.cli = resty.NewWithClient(cli)
}
if p.name == "" {
p.host = fmt.Sprintf("%s%s", scheme, host)
} else {
Expand Down
6 changes: 6 additions & 0 deletions core/elog/ali/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ func WithApiRetryMaxWaitTime(apiRetryMaxWaitTime time.Duration) Option {
c.apiRetryMaxWaitTime = apiRetryMaxWaitTime
}
}

func WithFallbackCore(core zapcore.Core) Option {
return func(c *config) {
c.fallbackCore = core
}
}
4 changes: 0 additions & 4 deletions core/elog/ali/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"

"github.com/go-resty/resty/v2"
)
Expand All @@ -15,11 +14,8 @@ const requestIDHeader = "x-log-requestid"
const (
httpScheme = "http://"
httpsScheme = "https://"
ipRegexStr = `\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}.*`
)

var ipRegex = regexp.MustCompile(ipRegexStr)

type badResError struct {
body string
header map[string][]string
Expand Down
115 changes: 70 additions & 45 deletions core/elog/ali/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@ import (
"context"
"fmt"
"log"
"math"
"sync"
"sync/atomic"
"time"

"github.com/go-resty/resty/v2"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/gotomicro/ego/core/eapp"
"github.com/gotomicro/ego/core/elog/ali/pb"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/util/xcast"
)

const (
// flushSize sets the flush size
flushSize int = 128
// entryChanSize sets the logs size
entryChanSize int = 4096
// observe interval
observeInterval = 5 * time.Second
)

type LogContent = pb.Log_Content
Expand All @@ -42,17 +48,18 @@ type config struct {
apiRetryCount int
apiRetryWaitTime time.Duration
apiRetryMaxWaitTime time.Duration
fallbackCore zapcore.Core
}

// writer implements LoggerInterface.
// Writes messages in keep-live tcp connection.
type writer struct {
store *LogStore
group []*pb.LogGroup
withMap bool
ch chan *pb.Log
curBufSize *int32
cancel context.CancelFunc
fallbackCore zapcore.Core
store *LogStore
ch chan *pb.Log
lock sync.Mutex
curBufSize *int32
cancel context.CancelFunc
config
}

Expand All @@ -66,14 +73,18 @@ func retryCondition(r *resty.Response, err error) bool {

// newWriter creates a new ali writer
func newWriter(c config) (*writer, error) {
w := &writer{config: c, ch: make(chan *pb.Log, c.apiBulkSize), curBufSize: new(int32)}
entryChanSize := entryChanSize
if c.apiBulkSize >= entryChanSize {
c.apiBulkSize = entryChanSize
}
w := &writer{config: c, ch: make(chan *pb.Log, entryChanSize), curBufSize: new(int32)}
p := &LogProject{
name: w.project,
endpoint: w.endpoint,
accessKeyID: w.accessKeyID,
accessKeySecret: w.accessKeySecret,
}
p.parseEndpoint()
p.initHost()
p.cli = resty.New().
SetDebug(eapp.IsDevelopmentMode()).
SetHostURL(p.host).
Expand All @@ -87,25 +98,9 @@ func newWriter(c config) (*writer, error) {
return nil, fmt.Errorf("getlogstroe fail,%w", err)
}
w.store = store

// Create default Log Group
w.group = append(w.group, &pb.LogGroup{
Topic: proto.String(""),
Source: proto.String(w.source),
Logs: make([]*pb.Log, 0, w.apiBulkSize),
})

// Create other Log Group
for _, topic := range w.topics {
lg := &pb.LogGroup{
Topic: proto.String(topic),
Source: proto.String(w.source),
Logs: make([]*pb.Log, 0, w.apiBulkSize),
}
w.group = append(w.group, lg)
}

w.Sync()
w.fallbackCore = c.fallbackCore
w.sync()
w.observe()
return w, nil
}

Expand Down Expand Up @@ -135,28 +130,49 @@ func (w *writer) write(fields map[string]interface{}) (err error) {
}

func (w *writer) flush() error {
// TODO sync pool
var lg = *w.group[0]
chlen := len(w.ch)
if chlen == 0 {
w.lock.Lock()
entriesChLen := len(w.ch)
if entriesChLen == 0 {
w.lock.Unlock()
return nil
}
var logs = make([]*pb.Log, 0, chlen)
logs = append(logs, <-w.ch)
L1:
for {
select {
case l := <-w.ch:
logs = append(logs, l)
default:
break L1
var waitedEntries = make([]*pb.Log, 0, entriesChLen)
for i := 0; i < entriesChLen; i++ {
waitedEntries = append(waitedEntries, <-w.ch)
}
w.lock.Unlock()

chunks := int(math.Ceil(float64(len(waitedEntries)) / float64(w.apiBulkSize)))
for i := 0; i < chunks; i++ {
go func(start int) {
end := (start + 1) * w.apiBulkSize
if end > len(waitedEntries) {
end = len(waitedEntries)
}
lg := pb.LogGroup{Logs: waitedEntries[start:end]}
if e := w.store.PutLogs(&lg); e != nil {
// if error occurs we put logs to fallback logger
w.writeToFallbackLogger(lg)
}
}(i)
}

return nil
}

func (w *writer) writeToFallbackLogger(lg pb.LogGroup) {
for _, v := range lg.Logs {
fields := make([]zapcore.Field, len(v.Contents), len(v.Contents))
for i, val := range v.Contents {
fields[i] = zap.String(val.GetKey(), val.GetValue())
}
if e := w.fallbackCore.Write(zapcore.Entry{Time: time.Now()}, fields); e != nil {
log.Println("fallbackCore write fail", e)
}
}
lg.Logs = logs
return w.store.PutLogs(&lg)
}

func (w *writer) Sync() {
func (w *writer) sync() {
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
ticker := time.NewTicker(w.flushBufferInterval)
Expand All @@ -175,3 +191,12 @@ func (w *writer) Sync() {
}()
return
}

func (w *writer) observe() {
go func() {
for {
emetric.LibHandleSummary.Observe(float64(len(w.ch)), "elog", "ali_waited_entries")
time.Sleep(observeInterval)
}
}()
}
109 changes: 63 additions & 46 deletions core/elog/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/gotomicro/ego/core/econf"
"github.com/gotomicro/ego/core/elog/ali"
"github.com/gotomicro/ego/core/util/xcolor"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
Expand Down Expand Up @@ -76,56 +77,73 @@ var (
ByteString = zap.ByteString
)

func newCore(config *Config, lv zap.AtomicLevel) (zapcore.Core, CloseFunc) {
core := config.core
var asyncStopFunc CloseFunc
const (
defaultAliFallbackCorePath = "ali.log"
)

encoderConfig := *config.encoderConfig
if config.Writer == writerRotateFile {
// Debug output to console and file by default
var ws = zapcore.AddSync(newRotate(config))
if config.Debug {
ws = zap.CombineWriteSyncers(os.Stdout, ws)
}
if config.EnableAsync {
ws, asyncStopFunc = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval)
// newRotateFileCore construct a rotate file zapcore.Core
func newRotateFileCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, cf CloseFunc) {
// Debug output to console and file by default
var ws = zapcore.AddSync(newRotate(config))
if config.Debug {
ws = zap.CombineWriteSyncers(os.Stdout, ws)
}
if config.EnableAsync {
ws, cf = Buffer(ws, config.FlushBufferSize, config.FlushBufferInterval)
}
core = zapcore.NewCore(
func() zapcore.Encoder {
if config.Debug {
return zapcore.NewConsoleEncoder(*config.encoderConfig)
}
return zapcore.NewJSONEncoder(*config.encoderConfig)
}(),
ws,
lv,
)
return core, cf
}

// newAliCore construct a ali SLS zapcore.Core
func newAliCore(config *Config, lv zap.AtomicLevel) (core zapcore.Core, cf CloseFunc) {
c := *config
c.Name = defaultAliFallbackCorePath
fallbackCore, fallbackCoreCf := newRotateFileCore(&c, lv)
core, cf = ali.NewCore(
ali.WithEncoder(ali.NewMapObjEncoder(*config.encoderConfig)),
ali.WithEndpoint(config.AliEndpoint),
ali.WithAccessKeyID(config.AliAccessKeyID),
ali.WithAccessKeySecret(config.AliAccessKeySecret),
ali.WithProject(config.AliProject),
ali.WithLogstore(config.AliLogstore),
ali.WithLevelEnabler(lv),
ali.WithFlushBufferSize(config.FlushBufferSize),
ali.WithFlushBufferInterval(config.FlushBufferInterval),
ali.WithApiBulkSize(config.AliApiBulkSize),
ali.WithApiTimeout(config.AliApiTimeout),
ali.WithApiRetryCount(config.AliApiRetryCount),
ali.WithApiRetryWaitTime(config.AliApiRetryWaitTime),
ali.WithApiRetryMaxWaitTime(config.AliApiRetryMaxWaitTime),
ali.WithFallbackCore(fallbackCore),
)
return core, func() (err error) {
if e := cf(); e != nil {
err = fmt.Errorf("exec close func fail, %w ", e)
}

if core == nil {
core = zapcore.NewCore(
func() zapcore.Encoder {
if config.Debug {
return zapcore.NewConsoleEncoder(encoderConfig)
}
return zapcore.NewJSONEncoder(encoderConfig)
}(),
ws,
lv,
)
if e := fallbackCoreCf(); e != nil {
err = fmt.Errorf("exec fallbackCore close func fail, %w", e)
}
return core, asyncStopFunc
return
}
}

func newCore(config *Config, lv zap.AtomicLevel) (zapcore.Core, CloseFunc) {
if config.Writer == writerRotateFile {
return newRotateFileCore(config, lv)
}
if config.Writer == writerAliSLS {
core, asyncStopFunc = ali.NewCore(
ali.WithEncoder(ali.NewMapObjEncoder(encoderConfig)),
ali.WithEndpoint(config.AliEndpoint),
ali.WithAccessKeyID(config.AliAccessKeyID),
ali.WithAccessKeySecret(config.AliAccessKeySecret),
ali.WithProject(config.AliProject),
ali.WithLogstore(config.AliLogstore),
ali.WithLevelEnabler(lv),
ali.WithFlushBufferSize(config.FlushBufferSize),
ali.WithFlushBufferInterval(config.FlushBufferInterval),
ali.WithApiBulkSize(config.AliApiBulkSize),
ali.WithApiTimeout(config.AliApiTimeout),
ali.WithApiRetryCount(config.AliApiRetryCount),
ali.WithApiRetryWaitTime(config.AliApiRetryWaitTime),
ali.WithApiRetryMaxWaitTime(config.AliApiRetryMaxWaitTime),
)
return core, nil
return newAliCore(config, lv)
}

return nil, nil
}

Expand Down Expand Up @@ -436,7 +454,6 @@ func panicDetail(msg string, fields ...Field) {
for key, val := range enc.Fields {
fmt.Printf(" %s: %s\n", xcolor.Red(key), fmt.Sprintf("%+v", val))
}

}

// With ...
Expand Down
Loading

0 comments on commit 62c36fa

Please sign in to comment.