diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go index 27c53157ed27..043d04c2c1d1 100644 --- a/filebeat/input/redis/harvester.go +++ b/filebeat/input/redis/harvester.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/util" ) @@ -162,6 +163,12 @@ func (h *Harvester) Run() error { }, } + // 设置为完成状态,避免进度文件回写报错 + data.SetState(file.State{ + Type: "redis", + Finished: true, + }) + h.forwarder.Send(data) } return nil @@ -176,3 +183,4 @@ func (h *Harvester) Stop() { func (h *Harvester) ID() uuid.UUID { return h.id } + diff --git a/filebeat/input/redis/input.go b/filebeat/input/redis/input.go index 5b49fa9b7069..88745dd512c6 100644 --- a/filebeat/input/redis/input.go +++ b/filebeat/input/redis/input.go @@ -18,7 +18,9 @@ package redis import ( + "encoding/json" "os" + "strings" "time" rd "github.com/garyburd/redigo/redis" @@ -41,11 +43,37 @@ func init() { // Input is a input for redis type Input struct { - started bool - outlet channel.Outleter - config config - cfg *common.Config - registry *harvester.Registry + started bool + outlet channel.Outleter + config config + cfg *common.Config + registry *harvester.Registry + passwordMap map[string]string +} + +// loadPwdFile reads the redis password file and returns the password map +// sample file: https://github.com/oliver006/redis_exporter/blob/master/contrib/sample-pwd-file.json +func loadPwdFile(passwordFile string) (map[string]string, error) { + res := make(map[string]string) + + logp.Debug("start load password file: %s", passwordFile) + bytes, err := os.ReadFile(passwordFile) + if err != nil { + logp.Warn("load password file failed: %s", err) + return nil, err + } + err = json.Unmarshal(bytes, &res) + if err != nil { + logp.Warn("password file format error: %s", err) + return nil, err + } + + logp.Info("Loaded %d entries from %s", len(res), passwordFile) + for k := range res { + logp.Debug("%s", k) + } + + return res, nil } // NewInput creates a new redis input @@ -60,32 +88,26 @@ func NewInput(cfg *common.Config, outletFactory channel.Connector, context input } // 读取文件内容 - var content = "" + passwordMap := make(map[string]string) if config.PasswordFile != "" { - info, err := os.ReadFile(config.PasswordFile) + passwordMap, err = loadPwdFile(config.PasswordFile) if err != nil { - logp.Err("Read Password File Error: %s", err) - } else { - content = string(info) + logp.Err("Error loading redis passwords from file %s, err: %s", config.PasswordFile, err) } } - // 将文件内容赋值给 config.Password - if content != "" { - config.Password = content - } - outlet, err := outletFactory(cfg, context.DynamicFields) if err != nil { return nil, err } p := &Input{ - started: false, - outlet: outlet, - config: config, - cfg: cfg, - registry: harvester.NewRegistry(), + started: false, + outlet: outlet, + config: config, + cfg: cfg, + registry: harvester.NewRegistry(), + passwordMap: passwordMap, } return p, nil @@ -107,7 +129,18 @@ func (p *Input) Run() { forwarder := harvester.NewForwarder(p.outlet) for _, host := range p.config.Hosts { - pool := CreatePool(host, p.config.Password, p.config.Network, + uri := host + if !strings.Contains(uri, "://") { + uri = "redis://" + uri + } + + // 判断 password file 中是否存在该域名的密码配置,如果不存在,则使用默认密码 + password, ok := p.passwordMap[uri] + if !ok { + password = p.config.Password + } + + pool := CreatePool(host, password, p.config.Network, p.config.MaxConn, p.config.IdleTimeout, p.config.IdleTimeout) h, err := NewHarvester(pool.Get())