Fluentd supports pluggable, customizable formats for output plugins. The plugin filenames starting with formatter_
are registered as Formatter Plugins.
See Plugin Base Class API for more details on the common APIs of all the plugins.
Following is an example of a custom formatter (formatter_my_csv.rb
) that outputs events in CSV format. It takes a required parameter called csv_fields
and outputs the fields. It assumes that the values of the fields are valid CSV fields.
require 'fluent/plugin/formatter'
module Fluent::Plugin
class MyCSVFormatter < Formatter
# Register MyCSVFormatter as 'my_csv'.
Fluent::Plugin.register_formatter('my_csv', self)
config_param :csv_fields, :array, value_type: :string
# This method does further processing. Configuration parameters can be
# accessed either via `conf` hash or member variables.
def configure(conf)
super
end
# This is the method that formats the data output.
def format(tag, time, record)
values = []
# Look up each required field and collect them from the record
@csv_fields.each do |field|
v = record[field]
unless v
log.error "#{field} is missing."
end
values << v.to_s
end
# Output by joining the fields with a comma
values.join(',')
end
end
end
Save this as formatter_my_csv.rb
in a loadable plugin path.
With out_file
output plugin:
# ...
<match test>
@type file
path /path/to/output/file
<format>
@type my_csv
csv_fields k1,k2
</format>
</match>
For a matched record e.g. {"k1": 100, "k2": 200}
, the output CSV file would look like this:
100,200
Formatter plugins are designed to be used from other plugins, like Input, Filter and Output. Formatter plugin helper helps achieve this:
# in class definition
helpers :formatter
# in #configure
@formatter = formatter_create(type: 'json')
# in #filter, #format or ...
es.each do |time, record|
row = @formatter.format(@tag, time, record)
# ...
end
See Formatter Plugin Helper API for details.
The formatter plugins implement filter
method to format the input Hash
record as a String
object.
It receives an event represented by tag
, time
and record
; and, after formatting returns a String
object.
Formatter plugins must implement this method.
Fluentd formatter plugin has one or more points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by the Fluentd core.
Fluentd also provides test driver for plugins. You can write tests for your own plugins very easily:
# test/plugin/test_formatter_your_own.rb
require 'test/unit'
require 'fluent/test/driver/formatter'
# your own plugin
require 'fluent/plugin/formatter_your_own'
class FormatterYourOwnTest < Test::Unit::TestCase
def setup
# common setup
end
CONFIG = %[
fields a,b,c
]
def create_driver(conf = CONF)
Fluent::Test::Driver::Formatter.new(Fluent::Plugin::YourOwnFormatter).configure(conf)
end
sub_test_case 'configured with invalid configurations' do
test 'empty' do
assert_raise(Fluent::ConfigError) do
create_driver('')
end
end
# ...
end
sub_test_case 'plugin will format record' do
test 'record has a field' do
d = create_driver(CONFIG)
tag = 'test'
time = event_time
record = { 'message' => 'This is message' }
formatted = d.instance.format(tag, time, record)
expected = '...'
assert_equal(expected, formatted)
end
end
end
Testing for formatter plugins is mainly for:
- Validation of configuration parameters (i.e.
#configure
) - Validation of the formatted records
To make testing easy, the plugin test driver provides a logger and the functionality to override the system, parser and other configurations.
The lifecycle of the plugin and its test driver is:
- Instantiate plugin driver which then instantiates the plugin
- Configure plugin
- Run test code
- Assert results of tests by data provided by the driver
For:
- configuration tests, repeat steps # 1-2
- full feature tests, repeat step # 1-4
For more details, see Testing API for Plugins.
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.