diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 606ba56..849cb10 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -1,87 +1,79 @@ package plugins import ( - "github.com/moriyoshi/ik" "bytes" - "github.com/ugorji/go/codec" + "fmt" "log" "net" "reflect" "strconv" + "sync" "time" + + "github.com/moriyoshi/ik" + "github.com/ugorji/go/codec" ) type ForwardOutput struct { - factory *ForwardOutputFactory - logger *log.Logger - codec *codec.MsgpackHandle - bind string - enc *codec.Encoder - conn net.Conn - buffer bytes.Buffer + factory *ForwardOutputFactory + logger *log.Logger + codec *codec.MsgpackHandle + bind string + enc *codec.Encoder + buffer *bytes.Buffer + emitCh chan []ik.FluentRecordSet + shutdown chan (chan error) + flushInterval int + flushWg sync.WaitGroup } func (output *ForwardOutput) encodeEntry(tag string, record ik.TinyFluentRecord) error { - v := []interface{} { tag, record.Timestamp, record.Data } + v := []interface{}{tag, record.Timestamp, record.Data} if output.enc == nil { - output.enc = codec.NewEncoder(&output.buffer, output.codec) - } - err := output.enc.Encode(v) - if err != nil { - return err + output.enc = codec.NewEncoder(output.buffer, output.codec) } - return err + return output.enc.Encode(v) } func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error { - v := []interface{} { recordSet.Tag, recordSet.Records } + v := []interface{}{recordSet.Tag, recordSet.Records} if output.enc == nil { - output.enc = codec.NewEncoder(&output.buffer, output.codec) - } - err := output.enc.Encode(v) - if err != nil { - return err + output.enc = codec.NewEncoder(output.buffer, output.codec) } - return err + return output.enc.Encode(v) } func (output *ForwardOutput) flush() error { - if output.conn == nil { + if output.buffer.Len() == 0 { + return nil + } + buffer := output.buffer + output.buffer = &bytes.Buffer{} + output.enc = nil + + output.flushWg.Add(1) + go func() { // TODO: static goroutine for flushing. + defer output.flushWg.Done() conn, err := net.Dial("tcp", output.bind) if err != nil { - output.logger.Printf("%#v", err.Error()) - return err - } else { - output.conn = conn + output.logger.Printf("%#v", err) + return } - } - n, err := output.buffer.WriteTo(output.conn) - if err != nil { - output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, output.buffer.Len(), err.Error()) - output.conn = nil - return err - } - if n > 0 { - output.logger.Printf("Forwarded: %d bytes (left: %d bytes)\n", n, output.buffer.Len()) - } - output.conn.Close() - output.conn = nil - return nil -} + defer conn.Close() -func (output *ForwardOutput) run_flush(flush_interval int) { - ticker := time.NewTicker(time.Duration(flush_interval) * time.Second) - go func() { - for { - select { - case <-ticker.C: - output.flush() - } + if n, err := buffer.WriteTo(conn); err != nil { + output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, buffer.Len(), err) } }() + return nil } func (output *ForwardOutput) Emit(recordSet []ik.FluentRecordSet) error { + output.emitCh <- recordSet + return nil +} + +func (output *ForwardOutput) emit(recordSet []ik.FluentRecordSet) error { for _, recordSet := range recordSet { err := output.encodeRecordSet(recordSet) if err != nil { @@ -97,28 +89,57 @@ func (output *ForwardOutput) Factory() ik.Plugin { } func (output *ForwardOutput) Run() error { - time.Sleep(1000000000) + time.Sleep(time.Second) + // TODO: Should return something when finished? return ik.Continue } +func (output *ForwardOutput) mainLoop() { + ticker := time.NewTicker(time.Duration(output.flushInterval) * time.Second) + for { + select { + case rs := <-output.emitCh: + fmt.Println("output: ", rs) + if err := output.emit(rs); err != nil { + output.logger.Printf("%#v", err) + } + case <-ticker.C: + output.flush() + case finish := <-output.shutdown: + close(output.emitCh) + output.flush() + output.flushWg.Wait() + finish <- nil + return + } + } +} + func (output *ForwardOutput) Shutdown() error { - return nil + finish := make(chan error) + output.shutdown <- finish + return <-finish } type ForwardOutputFactory struct { } -func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string) (*ForwardOutput, error) { +func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string, flushInterval int) *ForwardOutput { _codec := codec.MsgpackHandle{} _codec.MapType = reflect.TypeOf(map[string]interface{}(nil)) _codec.RawToString = false _codec.StructToArray = true return &ForwardOutput{ - factory: factory, - logger: logger, - codec: &_codec, - bind: bind, - }, nil + factory: factory, + logger: logger, + codec: &_codec, + bind: bind, + buffer: &bytes.Buffer{}, + emitCh: make(chan []ik.FluentRecordSet), + shutdown: make(chan chan error), + flushInterval: flushInterval, + flushWg: sync.WaitGroup{}, + } } func (factory *ForwardOutputFactory) Name() string { @@ -134,19 +155,19 @@ func (factory *ForwardOutputFactory) New(engine ik.Engine, config *ik.ConfigElem if !ok { netPort = "24224" } - flush_interval_str, ok := config.Attrs["flush_interval"] + flushIntervalStr, ok := config.Attrs["flush_interval"] if !ok { - flush_interval_str = "60" + flushIntervalStr = "60" } - flush_interval, err := strconv.Atoi(flush_interval_str) + flushInterval, err := strconv.Atoi(flushIntervalStr) if err != nil { engine.Logger().Print(err.Error()) return nil, err } bind := host + ":" + netPort - output, err := newForwardOutput(factory, engine.Logger(), bind) - output.run_flush(flush_interval) - return output, err + output := newForwardOutput(factory, engine.Logger(), bind, flushInterval) + go output.mainLoop() + return output, nil } func (factory *ForwardOutputFactory) BindScorekeeper(scorekeeper *ik.Scorekeeper) {