Skip to content

Commit

Permalink
Merge pull request #22 from jayjiahua/master
Browse files Browse the repository at this point in the history
feat: change redis password file format to json
  • Loading branch information
liuwenping authored Jun 16, 2023
2 parents cc22bac + b345ef2 commit 9d7030b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 21 deletions.
8 changes: 8 additions & 0 deletions filebeat/input/redis/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
)

Expand Down Expand Up @@ -162,6 +163,12 @@ func (h *Harvester) Run() error {
},
}

// 设置为完成状态,避免进度文件回写报错
data.SetState(file.State{
Type: "redis",
Finished: true,
})

h.forwarder.Send(data)
}
return nil
Expand All @@ -176,3 +183,4 @@ func (h *Harvester) Stop() {
func (h *Harvester) ID() uuid.UUID {
return h.id
}

75 changes: 54 additions & 21 deletions filebeat/input/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package redis

import (
"encoding/json"
"os"
"strings"
"time"

rd "github.com/garyburd/redigo/redis"
Expand All @@ -41,11 +43,37 @@ func init() {

// Input is a input for redis
type Input struct {
started bool
outlet channel.Outleter
config config
cfg *common.Config
registry *harvester.Registry
started bool
outlet channel.Outleter
config config
cfg *common.Config
registry *harvester.Registry
passwordMap map[string]string
}

// loadPwdFile reads the redis password file and returns the password map
// sample file: https://github.com/oliver006/redis_exporter/blob/master/contrib/sample-pwd-file.json
func loadPwdFile(passwordFile string) (map[string]string, error) {
res := make(map[string]string)

logp.Debug("start load password file: %s", passwordFile)
bytes, err := os.ReadFile(passwordFile)
if err != nil {
logp.Warn("load password file failed: %s", err)
return nil, err
}
err = json.Unmarshal(bytes, &res)
if err != nil {
logp.Warn("password file format error: %s", err)
return nil, err
}

logp.Info("Loaded %d entries from %s", len(res), passwordFile)
for k := range res {
logp.Debug("%s", k)
}

return res, nil
}

// NewInput creates a new redis input
Expand All @@ -60,32 +88,26 @@ func NewInput(cfg *common.Config, outletFactory channel.Connector, context input
}

// 读取文件内容
var content = ""
passwordMap := make(map[string]string)
if config.PasswordFile != "" {
info, err := os.ReadFile(config.PasswordFile)
passwordMap, err = loadPwdFile(config.PasswordFile)
if err != nil {
logp.Err("Read Password File Error: %s", err)
} else {
content = string(info)
logp.Err("Error loading redis passwords from file %s, err: %s", config.PasswordFile, err)
}
}

// 将文件内容赋值给 config.Password
if content != "" {
config.Password = content
}

outlet, err := outletFactory(cfg, context.DynamicFields)
if err != nil {
return nil, err
}

p := &Input{
started: false,
outlet: outlet,
config: config,
cfg: cfg,
registry: harvester.NewRegistry(),
started: false,
outlet: outlet,
config: config,
cfg: cfg,
registry: harvester.NewRegistry(),
passwordMap: passwordMap,
}

return p, nil
Expand All @@ -107,7 +129,18 @@ func (p *Input) Run() {

forwarder := harvester.NewForwarder(p.outlet)
for _, host := range p.config.Hosts {
pool := CreatePool(host, p.config.Password, p.config.Network,
uri := host
if !strings.Contains(uri, "://") {
uri = "redis://" + uri
}

// 判断 password file 中是否存在该域名的密码配置,如果不存在,则使用默认密码
password, ok := p.passwordMap[uri]
if !ok {
password = p.config.Password
}

pool := CreatePool(host, password, p.config.Network,
p.config.MaxConn, p.config.IdleTimeout, p.config.IdleTimeout)

h, err := NewHarvester(pool.Get())
Expand Down

0 comments on commit 9d7030b

Please sign in to comment.