From d52d17e55b264374454c662788061ce4333f1437 Mon Sep 17 00:00:00 2001 From: Richard Treu Date: Mon, 12 Feb 2024 16:31:09 +0100 Subject: [PATCH] filter_lua: Add chunk mode for processing multiple records Documentation for https://github.com/fluent/fluent-bit/pull/8478 Signed-off-by: Richard Treu --- pipeline/filters/lua.md | 180 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/pipeline/filters/lua.md b/pipeline/filters/lua.md index 2ef8e9872..e5c66748c 100644 --- a/pipeline/filters/lua.md +++ b/pipeline/filters/lua.md @@ -21,6 +21,7 @@ The plugin supports the following configuration parameters: | time\_as\_table | By default when the Lua script is invoked, the record timestamp is passed as a *floating number* which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys `sec` for seconds since epoch and `nsec` for nanoseconds. | | code | Inline LUA code instead of loading from a path via `script`. | | enable_flb_null| If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false. | +| chunk_mode | If enabled, a whole chunk will be sent to Lua script as a table of timestamps and records. It may be used for e.g. parallel execution inside Lua. Default is false. | ## Getting Started @@ -348,3 +349,182 @@ Configuration to get istio logs and apply response code filter to them. #### Output In the output only the messages with response code 0 or greater than 399 are shown. + +### Chunk mode + +There is a `chunk_mode` for the Lua filter in Fluent Bit. This mode can be useful for cases like parallelization, particularly when utilizing Lua lanes. + +#### Function Signature + +The Lua functions associated with this mode accept only two arguments: + +``` +function process_records(tag, records) +``` + +#### Configuration + +The configuration for the Lua filter using chunk mode looks like this: + +``` +[FILTER] + Name lua + Match my_logs + script lanes_example.lua + call process_records + chunk_mode On + time_as_table On +``` +#### Note + +- This mode currently only supports `time_as_table` by default. +- Records are always emitted; there is no return code to be set. + +#### Return Table Format + +The return table must maintain this format, i.e., a table of timestamp and record pairs. + +| Timestamp | Record | +|--------------------------|---------------------------------------| +| { | { | +| sec: , | message: "your_dummy_log_message" | +| nsec: | } | +| } | | +| | | +| { | { | +| sec: , | message: "your_dummy_log_message" | +| nsec: | } | +| } | | + +Please refer to the following examples to see how to build the return table. + +#### Input Table Example + +``` +function process_records(tag, records) + if records and type(records) == "table" then + for i, record_row in ipairs(records) do + local timestamp = record_row.timestamp + local record = record_row.record + + print("Timestamp entry:", timestamp.sec, timestamp.nsec) + print("Record entry:", record.message) + end + else + print("Error: Invalid 'records' table or nil") + end + return records +end +``` + + +#### Parallelization Example + +Ensure that you have Lua lanes installed (e.g. `luarocks install lanes`) and to set the path appropriately (`luarocks show lanes`) in your lua script. +To inject multiple dummy messages at once, you can adjust the `Copies` parameter of the `dummy` input. +Keep in mind that this example will create a new thread for every record in the chunk to keept the example simple. + +``` +fluent-bit.conf: + +[SERVICE] + Flush 5 + Log_Level debug + Daemon off + HTTP_Server Off + +[INPUT] + Name dummy + Tag my_logs + Rate 1 + Copies 2 + Dummy {"message":"your_dummy_log_message"} + +[FILTER] + Name lua + Match my_logs + script lanes_example.lua + call threads + chunk_mode On + time_as_table On + +[OUTPUT] + Name stdout + Match my_logs +``` + +```lua +lanes_example.lua: + +-- Specify path to Lua Lanes module +-- Install via: 'luarocks install lanes' +local lanes_path = "/usr/local/share/lua/5.1/lanes.lua" + +-- Load Lanes lib +local lanes = assert(loadfile(lanes_path))().configure() + +-- Lua function that will be executed as separate threads +local function process_log(timestamp, record) + -- Add your CPU intensive code here + print("Timestamp:", timestamp.sec, timestamp.nsec) + print("Record:", record.message) + + record.message = "Modified" + return timestamp, record +end + +-- Entry function +function threads(tag, records) + print("LUA ") + local thread_handles = {} + local results = {} + if records and type(records) == "table" then + print("Number of incoming records:", #records) + for i, log_event in ipairs(records) do + local timestamp = log_event.timestamp + local record = log_event.record + + -- Use lanes.gen to create a new thread + local thread = lanes.gen("*", process_log)(timestamp, record) + + -- Store the thread handle + table.insert(thread_handles, thread) + end + -- Wait for all threads to finish + for _, thread in ipairs(thread_handles) do + -- Get the result returned by each thread + local modified_record = thread[2] + local modified_timestamp = thread[1] + local result = {timestamp = modified_timestamp, record = modified_record} + table.insert(results, result) + end + print("All threads returned") + else + print("Error: Invalid or nil 'records' table.") + end + + return results +end +``` + +You should get the similar output: +``` +... +LUA +Number of incoming records: 2 +Timestamp: 1707308482 97855348 +Record: your_dummy_log_message +Timestamp: 1707308482 97664060 +Record: your_dummy_log_message +All threads returned +... +[0] my_logs: [[1707308479.115073794, {}], {"message"=>"Modified"}] +[1] my_logs: [[1707308479.147705065, {}], {"message"=>"Modified"}] +[2] my_logs: [[1707308480.097053227, {}], {"message"=>"Modified"}] +[3] my_logs: [[1707308480.097306893, {}], {"message"=>"Modified"}] +[4] my_logs: [[1707308481.097325851, {}], {"message"=>"Modified"}] +[5] my_logs: [[1707308481.097515912, {}], {"message"=>"Modified"}] +[6] my_logs: [[1707308482.097664060, {}], {"message"=>"Modified"}] +[7] my_logs: [[1707308482.097855348, {}], {"message"=>"Modified"}] +... +```