Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for elasticsearch v8 #41

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ before_script:
- sudo sysctl -w vm.max_map_count=262144

before_install:
- docker run -d -p 7777:9200 -e "discovery.type=single-node" --name elk docker.elastic.co/elasticsearch/elasticsearch:7.0.0
- docker run -d -p 7777:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms750m -Xmx750m" -e "ELASTIC_PASSWORD=1234" --name elk elasticsearch:8.13.0
- docker logs elk
- docker inspect elk
- travis_wait 5
Expand Down
38 changes: 26 additions & 12 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@

*Here's the version matrix:*

ElasticSearch version | Elastic version | Package URL | Remarks |
----------------------|------------------|------------------------------------------|---------|
7.x | 7.0 | [`gopkg.in/sohlich/elogrus.v7`](http://gopkg.in/sohlich/elogrus.v7)| Use Go modules.
6.x | 6.0 | [`gopkg.in/sohlich/elogrus.v3`](http://gopkg.in/sohlich/elogrus.v3)| Actively maintained.
5.x | 5.0 | [`gopkg.in/sohlich/elogrus.v2`](http://gopkg.in/sohlich/elogrus.v2)| Actively maintained.
2.x | 3.0 | [`gopkg.in/sohlich/elogrus.v1`](http://gopkg.in/sohlich/elogrus.v1)| Deprecated. Please update.
| ElasticSearch version | Elastic version | go-elasticsearch | Package URL | Remarks |
|------------------------|------------------|------------------|------------------------------|-------------------------------------------------------------------|
| 8.x | X | 8.13.1 | | Use the new typed elasticsearch api and underling data stream API |
| 7.x | 7.0 | X | gopkg.in/sohlich/elogrus.v7 | Use Go modules. |
| 6.x | 6.0 | X | gopkg.in/sohlich/elogrus.v3 | Actively maintained. |
| 5.x | 5.0 | X | gopkg.in/sohlich/elogrus.v2 | Actively maintained. |
| 2.x | 3.0 | X | gopkg.in/sohlich/elogrus.v1 | Deprecated. Please update. |

*For elastic search 8.x*
```bash
# We name v8 to align with elastic v8
go get gopkg.in/sohlich/elogrus.v8
go get github.com/elastic/go-elasticsearch/v8
```

*For elastic search 7.x*
```bash
Expand All @@ -35,26 +43,32 @@ go get gopkg.in/olivere/elastic.v5
```

## Changelog
- elastic 7.x support (currently in master)
- elastic search 8.x support (currently in master). **Breaking change**: we ditched the deprecated [elastic](https://github.com/olivere/elastic) driver from [olivere](https://github.com/olivere) and switched to the official [go-elasticsearch](https://github.com/elastic/go-elasticsearch) driver. It's recommended to use the data stream api for log messages. So name your "index" as `logs-XYZ` and use the `logs-` prefix in the `IndexName` field.
- elastic 7.x support

## Usage

```go
package main

import (
"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/sirupsen/logrus"
"gopkg.in/sohlich/elogrus.v7"
"gopkg.in/sohlich/elogrus.v8"
)

func main() {
log := logrus.New()
client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
client, err := elasticsearch.NewTypedClient(
elasticsearch.Config{
Addresses: []string{"https://localhost:9200"},
Username: "elastic",
Password: "1234",
})
if err != nil {
log.Panic(err)
}
hook, err := elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "mylog")
hook, err := elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "logs-mylog")
if err != nil {
log.Panic(err)
}
Expand All @@ -71,6 +85,6 @@ func main() {

```go
...
elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "mylog")
elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "logs-mylog")
...
```
20 changes: 17 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
module gopkg.in/sohlich/elogrus.v7

go 1.12
go 1.21

toolchain go1.22.2

require (
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect
github.com/olivere/elastic/v7 v7.0.4
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/sirupsen/logrus v1.4.2
)

require (
github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect
github.com/pkg/errors v0.8.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
golang.org/x/sys v0.14.0 // indirect
)
28 changes: 25 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0=
github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg=
github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand All @@ -28,8 +37,9 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -71,9 +81,18 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand All @@ -100,8 +119,9 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -121,5 +141,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
95 changes: 41 additions & 54 deletions hook.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package elogrus

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/sirupsen/logrus"
)

var (
// ErrCannotCreateIndex Fired if the index is not created
ErrCannotCreateIndex = fmt.Errorf("cannot create index")
)

// IndexNameFunc get index name
type IndexNameFunc func() string

Expand All @@ -23,7 +21,7 @@ type fireFunc func(entry *logrus.Entry, hook *ElasticHook) error
// ElasticHook is a logrus
// hook for ElasticSearch
type ElasticHook struct {
client *elastic.Client
client *elasticsearch.TypedClient
host string
index IndexNameFunc
levels []logrus.Level
Expand All @@ -33,21 +31,21 @@ type ElasticHook struct {
}

type message struct {
Host string `json:"Host,omitempty"`
Host string `json:"host,omitempty"`
Timestamp string `json:"@timestamp"`
File string `json:"File,omitempty"`
Func string `json:"Func,omitempty"`
Message string `json:"Message,omitempty"`
File string `json:"file,omitempty"`
Func string `json:"func,omitempty"`
Message string `json:"message,omitempty"`
Data logrus.Fields
Level string `json:"Level,omitempty"`
Level string `json:"level,omitempty"`
}

// NewElasticHook creates new hook.
// client - ElasticSearch client with specific es version (v5/v6/v7/...)
// host - host of system
// level - log level
// index - name of the index in ElasticSearch
func NewElasticHook(client *elastic.Client, host string, level logrus.Level, index string) (*ElasticHook, error) {
func NewElasticHook(client *elasticsearch.TypedClient, host string, level logrus.Level, index string) (*ElasticHook, error) {
return NewElasticHookWithFunc(client, host, level, func() string { return index })
}

Expand All @@ -56,7 +54,7 @@ func NewElasticHook(client *elastic.Client, host string, level logrus.Level, ind
// host - host of system
// level - log level
// index - name of the index in ElasticSearch
func NewAsyncElasticHook(client *elastic.Client, host string, level logrus.Level, index string) (*ElasticHook, error) {
func NewAsyncElasticHook(client *elasticsearch.TypedClient, host string, level logrus.Level, index string) (*ElasticHook, error) {
return NewAsyncElasticHookWithFunc(client, host, level, func() string { return index })
}

Expand All @@ -65,7 +63,7 @@ func NewAsyncElasticHook(client *elastic.Client, host string, level logrus.Level
// host - host of system
// level - log level
// index - name of the index in ElasticSearch
func NewBulkProcessorElasticHook(client *elastic.Client, host string, level logrus.Level, index string) (*ElasticHook, error) {
func NewBulkProcessorElasticHook(client *elasticsearch.TypedClient, host string, level logrus.Level, index string) (*ElasticHook, error) {
return NewBulkProcessorElasticHookWithFunc(client, host, level, func() string { return index })
}

Expand All @@ -76,7 +74,7 @@ func NewBulkProcessorElasticHook(client *elastic.Client, host string, level logr
// host - host of system
// level - log level
// indexFunc - function providing the name of index
func NewElasticHookWithFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
func NewElasticHookWithFunc(client *elasticsearch.TypedClient, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
return newHookFuncAndFireFunc(client, host, level, indexFunc, syncFireFunc)
}

Expand All @@ -87,7 +85,7 @@ func NewElasticHookWithFunc(client *elastic.Client, host string, level logrus.Le
// host - host of system
// level - log level
// indexFunc - function providing the name of index
func NewAsyncElasticHookWithFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
func NewAsyncElasticHookWithFunc(client *elasticsearch.TypedClient, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
return newHookFuncAndFireFunc(client, host, level, indexFunc, asyncFireFunc)
}

Expand All @@ -99,15 +97,15 @@ func NewAsyncElasticHookWithFunc(client *elastic.Client, host string, level logr
// host - host of system
// level - log level
// indexFunc - function providing the name of index
func NewBulkProcessorElasticHookWithFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
func NewBulkProcessorElasticHookWithFunc(client *elasticsearch.TypedClient, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
fireFunc, err := makeBulkFireFunc(client)
if err != nil {
return nil, err
}
return newHookFuncAndFireFunc(client, host, level, indexFunc, fireFunc)
}

func newHookFuncAndFireFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc, fireFunc fireFunc) (*ElasticHook, error) {
func newHookFuncAndFireFunc(client *elasticsearch.TypedClient, host string, level logrus.Level, indexFunc IndexNameFunc, fireFunc fireFunc) (*ElasticHook, error) {
var levels []logrus.Level
for _, l := range []logrus.Level{
logrus.PanicLevel,
Expand All @@ -125,25 +123,6 @@ func newHookFuncAndFireFunc(client *elastic.Client, host string, level logrus.Le

ctx, cancel := context.WithCancel(context.TODO())

// Use the IndexExists service to check if a specified index exists.
exists, err := client.IndexExists(indexFunc()).Do(ctx)
if err != nil {
// Handle error
cancel()
return nil, err
}
if !exists {
createIndex, err := client.CreateIndex(indexFunc()).Do(ctx)
if err != nil {
cancel()
return nil, err
}
if !createIndex.Acknowledged {
cancel()
return nil, ErrCannotCreateIndex
}
}

return &ElasticHook{
client: client,
host: host,
Expand Down Expand Up @@ -195,30 +174,38 @@ func createMessage(entry *logrus.Entry, hook *ElasticHook) *message {

func syncFireFunc(entry *logrus.Entry, hook *ElasticHook) error {
_, err := hook.client.
Index().
Index(hook.index()).
Type("log").
BodyJson(*createMessage(entry, hook)).
Request(*createMessage(entry, hook)).
Do(hook.ctx)

return err
}

// Create closure with bulk processor tied to fireFunc.
func makeBulkFireFunc(client *elastic.Client) (fireFunc, error) {
processor, err := client.BulkProcessor().
Name("elogrus.v3.bulk.processor").
Workers(2).
FlushInterval(time.Second).
Do(context.Background())

func makeBulkFireFunc(client *elasticsearch.TypedClient) (fireFunc, error) {
c, _ := elasticsearch.NewDefaultClient()
c.Transport = client.Transport

indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c,
NumWorkers: 2,
FlushInterval: time.Second,
})
return func(entry *logrus.Entry, hook *ElasticHook) error {
r := elastic.NewBulkIndexRequest().
Index(hook.index()).
Type("log").
Doc(*createMessage(entry, hook))
processor.Add(r)
return nil
data, err := json.Marshal(createMessage(entry, hook))
if err != nil {
return err
}

r := esutil.BulkIndexerItem{
Index: hook.index(),
Action: "create",
Body: bytes.NewReader(data),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, err error) {
fmt.Printf("Failed to index log message %v", err)
},
}
return indexer.Add(context.TODO(), r)
}, err
}

Expand Down
Loading