Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_lua: Add chunk mode for processing multiple records #1310

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions pipeline/filters/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The plugin supports the following configuration parameters:
| time\_as\_table | By default when the Lua script is invoked, the record timestamp is passed as a *floating number* which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys `sec` for seconds since epoch and `nsec` for nanoseconds. |
| code | Inline LUA code instead of loading from a path via `script`. |
| enable_flb_null| If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false. |
| chunk_mode | If enabled, a whole chunk will be sent to Lua script as a table of timestamps and records. It may be used for e.g. parallel execution inside Lua. Default is false. |

## Getting Started <a id="getting_started"></a>

Expand Down Expand Up @@ -348,3 +349,182 @@ Configuration to get istio logs and apply response code filter to them.
#### Output

In the output only the messages with response code 0 or greater than 399 are shown.

### Chunk mode

There is a `chunk_mode` for the Lua filter in Fluent Bit. This mode can be useful for cases like parallelization, particularly when utilizing Lua lanes.

#### Function Signature

The Lua functions associated with this mode accept only two arguments:

```
function process_records(tag, records)
```

#### Configuration

The configuration for the Lua filter using chunk mode looks like this:

```
[FILTER]
Name lua
Match my_logs
script lanes_example.lua
call process_records
chunk_mode On
time_as_table On
```
#### Note

- This mode currently only supports `time_as_table` by default.
- Records are always emitted; there is no return code to be set.

#### Return Table Format

The return table must maintain this format, i.e., a table of timestamp and record pairs.

| Timestamp | Record |
|--------------------------|---------------------------------------|
| { | { |
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
| nsec: <timestamp_nsec>| } |
| } | |
| | |
| { | { |
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
| nsec: <timestamp_nsec>| } |
| } | |

Please refer to the following examples to see how to build the return table.

#### Input Table Example

```
function process_records(tag, records)
if records and type(records) == "table" then
for i, record_row in ipairs(records) do
local timestamp = record_row.timestamp
local record = record_row.record

print("Timestamp entry:", timestamp.sec, timestamp.nsec)
print("Record entry:", record.message)
end
else
print("Error: Invalid 'records' table or nil")
end
return records
end
```


#### Parallelization Example

Ensure that you have Lua lanes installed (e.g. `luarocks install lanes`) and to set the path appropriately (`luarocks show lanes`) in your lua script.
To inject multiple dummy messages at once, you can adjust the `Copies` parameter of the `dummy` input.
Keep in mind that this example will create a new thread for every record in the chunk to keep the example simple.

```
fluent-bit.conf:

[SERVICE]
Flush 5
Log_Level debug
Daemon off
HTTP_Server Off

[INPUT]
Name dummy
Tag my_logs
Rate 1
Copies 2
Dummy {"message":"your_dummy_log_message"}

[FILTER]
Name lua
Match my_logs
script lanes_example.lua
call threads
chunk_mode On
time_as_table On

[OUTPUT]
Name stdout
Match my_logs
```

```lua
lanes_example.lua:

-- Specify path to Lua Lanes module
-- Install via: 'luarocks install lanes'
local lanes_path = "/usr/local/share/lua/5.1/lanes.lua"

-- Load Lanes lib
local lanes = assert(loadfile(lanes_path))().configure()

-- Lua function that will be executed as separate threads
local function process_log(timestamp, record)
-- Add your CPU intensive code here
print("Timestamp:", timestamp.sec, timestamp.nsec)
print("Record:", record.message)

record.message = "Modified"
return timestamp, record
end

-- Entry function
function threads(tag, records)
print("LUA ")
local thread_handles = {}
local results = {}
if records and type(records) == "table" then
print("Number of incoming records:", #records)
for i, log_event in ipairs(records) do
local timestamp = log_event.timestamp
local record = log_event.record

-- Use lanes.gen to create a new thread
local thread = lanes.gen("*", process_log)(timestamp, record)

-- Store the thread handle
table.insert(thread_handles, thread)
end
-- Wait for all threads to finish
for _, thread in ipairs(thread_handles) do
-- Get the result returned by each thread
local modified_record = thread[2]
local modified_timestamp = thread[1]
local result = {timestamp = modified_timestamp, record = modified_record}
table.insert(results, result)
end
print("All threads returned")
else
print("Error: Invalid or nil 'records' table.")
end

return results
end
```

You should get a similar output:
```
...
LUA
Number of incoming records: 2
Timestamp: 1707308482 97855348
Record: your_dummy_log_message
Timestamp: 1707308482 97664060
Record: your_dummy_log_message
All threads returned
...
[0] my_logs: [[1707308479.115073794, {}], {"message"=>"Modified"}]
[1] my_logs: [[1707308479.147705065, {}], {"message"=>"Modified"}]
[2] my_logs: [[1707308480.097053227, {}], {"message"=>"Modified"}]
[3] my_logs: [[1707308480.097306893, {}], {"message"=>"Modified"}]
[4] my_logs: [[1707308481.097325851, {}], {"message"=>"Modified"}]
[5] my_logs: [[1707308481.097515912, {}], {"message"=>"Modified"}]
[6] my_logs: [[1707308482.097664060, {}], {"message"=>"Modified"}]
[7] my_logs: [[1707308482.097855348, {}], {"message"=>"Modified"}]
...
```