Skip to content

Commit

Permalink
Merge pull request #46 from ansakharov/metric-mask-plugin
Browse files Browse the repository at this point in the history
Add optional metric to mask plugin
  • Loading branch information
ansakharov authored Feb 3, 2022
2 parents 143a804 + 83e255e commit 51bda38
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 71 deletions.
13 changes: 9 additions & 4 deletions pipeline/metrics_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ import (
"go.uber.org/atomic"
)

const PromNamespace = "file_d"

// metricHolder has nextMetricsGen method that creates new prometheus.CounterOpts,
// differs form previous only by prometheus.CounterOpts.ConstLabels: {"gen": incValue}.
// This decision help throw away from memory old metric\s, that wasn't written for a while.
// TODO create better mechanism that'll delete only old metric\s, that aren't in use for N minutes.
type metricsHolder struct {
pipelineName string
metricsGen int // generation is used to drop unused metrics from counters
metricsGen int // generation is used to drop unused metrics from counters.
metricsGenTime time.Time
metricsGenInterval time.Duration
metrics []*metrics
Expand Down Expand Up @@ -90,9 +96,8 @@ func (m *metricsHolder) nextMetricsGen() {
for _, st := range allEventStatuses() {
cnt.totalCounter[string(st)] = atomic.NewUint64(0)
}

opts := prometheus.CounterOpts{
Namespace: "file_d",
Namespace: PromNamespace,
Subsystem: "pipeline_" + m.pipelineName,
Name: metrics.name + "_events_count_total",
Help: fmt.Sprintf("how many events processed by pipeline %q and #%d action", m.pipelineName, index),
Expand All @@ -101,7 +106,7 @@ func (m *metricsHolder) nextMetricsGen() {
cnt.count = prometheus.NewCounterVec(opts, append([]string{"status"}, metrics.labels...))

opts = prometheus.CounterOpts{
Namespace: "file_d",
Namespace: PromNamespace,
Subsystem: "pipeline_" + m.pipelineName,
Name: metrics.name + "_events_size_total",
Help: fmt.Sprintf("total size of events processed by pipeline %q and #%d action", m.pipelineName, index),
Expand Down
2 changes: 2 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go run cmd/file.d.go --config=config.yaml
curl "localhost:9200/_bulk" -H 'Content-Type: application/json' -d \
'{"index":{"_index":"index-main","_type":"span"}}
{"message": "hello", "kind": "normal"}
'
##
Expand Down Expand Up @@ -230,6 +231,7 @@ pipelines:
...
actions:
- type: mask
metric_subsystem_name: "some_name"
masks:
- mask:
re: "\b(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\b"
Expand Down
1 change: 1 addition & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pipelines:
...
actions:
- type: mask
metric_subsystem_name: "some_name"
masks:
- mask:
re: "\b(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\b"
Expand Down
10 changes: 9 additions & 1 deletion plugin/action/mask/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pipelines:
...
actions:
- type: mask
metric_subsystem_name: "some_name"
masks:
- mask:
re: "\b(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\b"
Expand All @@ -18,9 +19,15 @@ pipelines:


### Config params
**`metric_subsystem_name`** *`*string`*

If set counterMetric with this name would be sent on metric_subsystem_name.mask_plugin

<br>

**`masks`** *`[]Mask`*

List of masks
List of masks.

<br>

Expand All @@ -32,6 +39,7 @@ Regular expression for masking

**`groups`** *`[]int`* *`required`*

Numbers of masking groups in expression, zero for mask all expression

<br>

Expand Down
70 changes: 56 additions & 14 deletions plugin/action/mask/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)
Expand All @@ -21,6 +22,7 @@ pipelines:
...
actions:
- type: mask
metric_subsystem_name: "some_name"
masks:
- mask:
re: "\b(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\D?(\d{1,4})\b"
Expand All @@ -30,22 +32,34 @@ pipelines:
}*/

const substitution = byte('*')
const (
substitution = byte('*')

metricName = "mask_plugin"
)

var MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{})

type Plugin struct {
config *Config
sourceBuf []byte
maskBuf []byte
valueNodes []*insaneJSON.Node
logger *zap.SugaredLogger
config *Config
sourceBuf []byte
maskBuf []byte
logMaskAppeared bool
valueNodes []*insaneJSON.Node
logger *zap.SugaredLogger
}

//! config-params
//^ config-params
type Config struct {
//> @3@4@5@6
//>
//> List of masks
//> If set counterMetric with this name would be sent on metric_subsystem_name.mask_plugin
MetricSubsystemName *string `json:"metric_subsystem_name" required:"false"` //*

//> @3@4@5@6
//>
//> List of masks.
Masks []Mask `json:"masks"` //*
}

Expand All @@ -58,7 +72,7 @@ type Mask struct {

//> @3@4@5@6
//>
// Numbers of masking groups in expression, zero for mask all expression
//> Numbers of masking groups in expression, zero for mask all expression
Groups []int `json:"groups" required:"true"` //*
}

Expand Down Expand Up @@ -134,9 +148,26 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
p.valueNodes = make([]*insaneJSON.Node, 0)
p.logger = params.Logger
p.config.Masks = compileMasks(p.config.Masks, p.logger)
if p.config.MetricSubsystemName != nil {
p.logMaskAppeared = true
p.registerPluginMetrics(pipeline.PromNamespace, *p.config.MetricSubsystemName, metricName)
}
}

func (p Plugin) Stop() {
func (p *Plugin) Stop() {
}

func (p *Plugin) registerPluginMetrics(namespace, subsystem, metricName string) {
// can't declare counter as property on p.counter, because multiple cores
// will create multiple metrics and all but last one will be unregistered.
MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: metricName,
Help: "",
})
prometheus.DefaultRegisterer.Unregister(MaskPromCounter)
prometheus.DefaultRegisterer.MustRegister(MaskPromCounter)
}

func appendMask(dst, src []byte, begin, end int) ([]byte, int) {
Expand All @@ -160,10 +191,11 @@ func maskSection(dst, src []byte, begin, end int) ([]byte, int) {
return dst, offset
}

func maskValue(value, buf []byte, re *regexp.Regexp, groups []int, l *zap.SugaredLogger) []byte {
// mask value returns masked value and bool answer was buf masked at all.
func maskValue(value, buf []byte, re *regexp.Regexp, groups []int) ([]byte, bool) {
indexes := re.FindAllSubmatchIndex(value, -1)
if len(indexes) == 0 {
return value
return value, false
}

buf = buf[:0]
Expand All @@ -180,7 +212,7 @@ func maskValue(value, buf []byte, re *regexp.Regexp, groups []int, l *zap.Sugare
}
}

return value
return value, true
}

func getValueNodeList(currentNode *insaneJSON.Node, valueNodes []*insaneJSON.Node) []*insaneJSON.Node {
Expand All @@ -204,19 +236,29 @@ func getValueNodeList(currentNode *insaneJSON.Node, valueNodes []*insaneJSON.Nod
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
root := event.Root.Node

// apply vars need to check if mask was applied to event data and send metric.
maskApplied := false
locApplied := false

p.valueNodes = p.valueNodes[:0]
p.valueNodes = getValueNodeList(root, p.valueNodes)

for _, v := range p.valueNodes {
value := v.AsBytes()
p.sourceBuf = append(p.sourceBuf[:0], value...)
p.maskBuf = append(p.maskBuf[:0], p.sourceBuf...)
for _, mask := range p.config.Masks {
p.maskBuf = maskValue(p.sourceBuf, p.maskBuf, mask.Re_, mask.Groups, p.logger)
p.maskBuf, locApplied = maskValue(p.sourceBuf, p.maskBuf, mask.Re_, mask.Groups)
p.sourceBuf = p.maskBuf
if locApplied {
maskApplied = true
}
}
v.MutateToString(string(p.maskBuf))
}
if p.logMaskAppeared && maskApplied {
MaskPromCounter.Inc()
p.logger.Infof("mask appeared to event, output string: %s", event.Root.EncodeToString())
}

return pipeline.ActionPass
}
Loading

0 comments on commit 51bda38

Please sign in to comment.