Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor_calyptia: new lua processor for logs/metrics/traces #9323

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DEFINE_OPTION(FLB_IN_WINSTAT "Enable Windows Stat input plugin"
# Processors
# ==========
DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_CALYPTIA "Enable calyptia core lua processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
reportingInstance"=>"EMPTY
19 changes: 19 additions & 0 deletions examples/processor_calyptia/logs_empty_key/fluent-bit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
service:
flush: 1
log_level: error

pipeline:
inputs:
- name: dummy
tag: event
dummy: |-
{"involvedObject":{"uid":"33146c89-69d9-4d99-b8f9-f256278732c9","kind":"Pod","resourceVersion":"381","apiVersion":"v1","name":"calyptia-core-controller-manager-7d64bbcdbc-fdpd9","namespace":"calyptia"},"reportingComponent":"default-scheduler","reportingInstance":"","reason":"FailedScheduling","type":"Warning","message":"0/1 nodes are available: 1 node(s) had untolerated taint {node.kubernetes.io/not-ready: }. preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling.","source":{"component":"default-scheduler"},"lastTimestamp":"2024-07-04T12:14:48Z","firstTimestamp":"2024-07-04T12:14:48Z","metadata":{"uid":"0f537d3a-df6d-478f-9ead-54c0be200ea7","namespace":"calyptia","resourceVersion":"403","creationTimestamp":"2024-07-04T12:14:48Z","name":"calyptia-core-controller-manager-7d64bbcdbc-fdpd9.17df018d10d3fc8a","managedFields":[{"manager":"kube-scheduler","operation":"Update","apiVersion":"v1","fieldsType":"FieldsV1","time":"2024-07-04T12:14:48Z"}]}}
processors:
logs:
- name: calyptia
script: script.lua
call: process_logs

outputs:
- name: stdout
match: event
8 changes: 8 additions & 0 deletions examples/processor_calyptia/logs_empty_key/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
function process_logs(tag, ts, log)
for k, v in pairs(log) do
if v == "" then
log[k] = 'EMPTY'
end
end
return MODIFY, ts, log
end
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}], {"extra_data"=>["null", false, "false", true, nil, -4, "ba4", 4, 1]}]
23 changes: 23 additions & 0 deletions examples/processor_calyptia/logs_modify_key/fluent-bit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: logs
processors:
logs:
- name: calyptia
script: script.lua
call: process_logs
# A JSON/YAML object can be passed as "opts" to the script.
# See "script.lua" for an example of how it can be used
opts:
key: extra_data
value: ["null", false, "false", true, null, -4, ba4, 4., 1]

outputs:
- name: stdout
match: event
9 changes: 9 additions & 0 deletions examples/processor_calyptia/logs_modify_key/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Load the "opts" object passed in fluent-bit.yaml config into a local
-- variable
local opts = ...

function process_logs(tag, ts, log)
log.event_type = nil
log[opts.key] = opts.value
return MODIFY, ts, log
end
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"hello"=>"world"}], {"event_type"=>"some logs"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 1
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: logs
processors:
logs:
- name: calyptia
script: script.lua
call: process_logs

outputs:
- name: stdout
match: event
3 changes: 3 additions & 0 deletions examples/processor_calyptia/logs_modify_metadata/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
function process_logs(tag, ts, log)
return MODIFY, ts, log, {hello = 'world'}
end
4 changes: 4 additions & 0 deletions examples/processor_calyptia/logs_split_drop/expected_output
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[0] event: [[0.000000000, {"index"=>6}], {"event_type"=>"some logs 0"}]
[1] event: [[10.000000000, {"index"=>6}], {"event_type"=>"some logs 0"}]
[0] event: [[2.000000000, {"index"=>8}], {"event_type"=>"some logs 2"}]
[1] event: [[12.000000000, {"index"=>8}], {"event_type"=>"some logs 2"}]
18 changes: 18 additions & 0 deletions examples/processor_calyptia/logs_split_drop/fluent-bit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 1
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: logs
processors:
logs:
- name: calyptia
script: script.lua
call: process_logs

outputs:
- name: stdout
match: event
11 changes: 11 additions & 0 deletions examples/processor_calyptia/logs_split_drop/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
local i = 0

function process_logs(tag, ts, log)
ts = i
log.event_type = log.event_type .. " " .. i
i = i + 1
if i % 2 == 0 then
return DROP
end
return MODIFY, {ts, ts + 10}, {log, log}, {index = i + 5}
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
1970-01-01T00:00:00.000000000Z kubernetes_network_load_counter = 3
1970-01-01T00:00:00.000000000Z kubernetes_network_load_counter{app="cmetrics"} = 1
1970-01-01T00:00:00.000000000Z kubernetes_network_load_counter{app="test"} = 12.15
1970-01-01T00:00:00.000000000Z kubernetes_network_load_gauge = 1
1970-01-01T00:00:00.000000000Z kubernetes_network_load_gauge{hello="world"} = 42
1970-01-01T00:00:00.000000000Z k8s_disk_load_summary = { quantiles = { 0.1=1.1, 0.2=2.2, 0.3=3.3, 0.4=4.4, 0.5=5.5 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000000Z k8s_disk_load_summary{my_label="my_val"} = { quantiles = { 0.1=11.11, 0.2=0, 0.3=33.33, 0.4=44.44, 0.5=55.55 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000000Z k8s_network_load_histogram = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
1970-01-01T00:00:00.000000000Z k8s_network_load_histogram{my_label="my_val"} = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: metrics
processors:
metrics:
- name: calyptia
script: script.lua
call: process_metrics

outputs:
- name: stdout
match: event
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
function process_metrics(tag, ts, metric)
for _, sample in ipairs(metric.metrics) do
sample.timestamp = 0
end

if metric.name == 'kubernetes_network_load_counter' then
for _, sample in ipairs(metric.metrics) do
if sample.labels then
sample.labels.hostname = nil
end
end
end
if metric.name == 'kubernetes_network_load_gauge' then
table.insert(metric.metrics, {
timestamp = 0,
value = 42,
labels = {
hello = 'world'
}
})
end
return MODIFY, ts, metric
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter = 3
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter{app="cmetrics"} = 1
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter{app="test"} = 12.15
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_gauge = 1
1970-01-01T00:00:00.000000100Z metric_k8s_disk_load_summary = { quantiles = { 0.1=1.1, 0.2=2.2, 0.3=3.3, 0.4=4.4, 0.5=5.5 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000100Z metric_k8s_disk_load_summary{my_label="my_val"} = { quantiles = { 0.1=11.11, 0.2=0, 0.3=33.33, 0.4=44.44, 0.5=55.55 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000100Z metric_k8s_network_load_histogram = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
1970-01-01T00:00:00.000000100Z metric_k8s_network_load_histogram{my_label="my_val"} = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
18 changes: 18 additions & 0 deletions examples/processor_calyptia/metrics_modify_name/fluent-bit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: metrics
processors:
metrics:
- name: calyptia
script: script.lua
call: process_metrics

outputs:
- name: stdout
match: event
14 changes: 14 additions & 0 deletions examples/processor_calyptia/metrics_modify_name/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
function process_metrics(tag, ts, metric)
for _, sample in ipairs(metric.metrics) do
sample.timestamp = 100
end
if metric.name == 'kubernetes_network_load_counter' then
for _, sample in ipairs(metric.metrics) do
if sample.labels then
sample.labels.hostname = nil
end
end
end
metric.name = 'metric_' .. metric.name
return MODIFY, ts, metric
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter = 3
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter{app="cmetrics"} = 1
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_counter{app="test"} = 12.15
1970-01-01T00:00:00.000000100Z metric_kubernetes_network_load_gauge = 1
1970-01-01T00:00:00.000000100Z metric_k8s_disk_load_summary = { quantiles = { 0.1=1.1, 0.2=2.2, 0.3=3.3, 0.4=4.4, 0.5=5.5 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000100Z metric_k8s_disk_load_summary{my_label="my_val"} = { quantiles = { 0.1=11.11, 0.2=0, 0.3=33.33, 0.4=44.44, 0.5=55.55 }, sum=51.6129, count=10 }
1970-01-01T00:00:00.000000100Z metric_k8s_network_load_histogram = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
1970-01-01T00:00:00.000000100Z metric_k8s_network_load_histogram{my_label="my_val"} = { buckets = { 0.05=2, 5=3, 10=4, +Inf=5 }, sum=1013.02, count=5 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: metrics

outputs:
- name: stdout
match: event
processors:
metrics:
- name: calyptia
script: script.lua
call: process_metrics
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
function process_metrics(tag, ts, metric)
for _, sample in ipairs(metric.metrics) do
sample.timestamp = 100
end
if metric.name == 'kubernetes_network_load_counter' then
for _, sample in ipairs(metric.metrics) do
if sample.labels then
sample.labels.hostname = nil
end
end
end
metric.name = 'metric_' .. metric.name
return MODIFY, ts, metric
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
|-------------------- RESOURCE SPAN --------------------|
resource:
- attributes:
- service.name: 'Fluent Bit Test Service'
- dropped_attributes_count: 10
schema_url: https://www.google2.com
[scope_span]
instrumentation scope:
- name : scope-span
- version : d.e.f
- dropped_attributes_count: 5
- attributes:

schema_url: https://www.google.com
[spans]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: traces
processors:
traces:
- name: calyptia
script: script.lua
call: process_traces

outputs:
- name: stdout
match: event
11 changes: 11 additions & 0 deletions examples/processor_calyptia/traces_modify_metadata/script.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
function process_traces(tag, ts, span, metadata)
metadata.resourceSpan.resource.droppedAttributesCount = 10
metadata.resourceSpan.schemaUrl = 'https://www.google2.com'
metadata.scopeSpan.schemaUrl = 'https://www.google.com'
metadata.scopeSpan.scope.name = 'scope-span'
metadata.scopeSpan.scope.version = 'd.e.f'
metadata.scopeSpan.scope.droppedAttributesCount = 5
span.startTimeUnixNano = 10
span.endTimeUnixNano = 20
return MODIFY, ts, span, metadata
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
|-------------------- RESOURCE SPAN --------------------|
resource:
- attributes:
- service.name: 'Fluent Bit Test Service'
- dropped_attributes_count: 10
schema_url: https://www.google2.com
[scope_span]
instrumentation scope:
- name : scope-span
- version : d.e.f
- dropped_attributes_count: 5
- attributes:

schema_url: https://www.google.com
[spans]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
service:
flush: 0.2
log_level: error

pipeline:
inputs:
- name: event_type
tag: event
type: traces

outputs:
- name: stdout
match: event
processors:
traces:
- name: calyptia
script: script.lua
call: process_traces
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
function process_traces(tag, ts, span, metadata)
metadata.resourceSpan.resource.droppedAttributesCount = 10
metadata.resourceSpan.schemaUrl = 'https://www.google2.com'
metadata.scopeSpan.schemaUrl = 'https://www.google.com'
metadata.scopeSpan.scope.name = 'scope-span'
metadata.scopeSpan.scope.version = 'd.e.f'
metadata.scopeSpan.scope.droppedAttributesCount = 5
span.startTimeUnixNano = 10
span.endTimeUnixNano = 20
return MODIFY, ts, span, metadata
end
Loading
Loading