Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC, Do not merge] input_chunk: split incoming buffer when it's too big #9385

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct flb_config {
int is_running; /* service running ? */
double flush; /* Flush timeout */

/*
* Maximum grace time on shutdown. If set to -1, the engine will
/*
* Maximum grace time on shutdown. If set to -1, the engine will
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
Expand Down Expand Up @@ -227,6 +227,7 @@ struct flb_config {
char *storage_bl_mem_limit; /* storage backlog memory limit */
struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */
int storage_trim_files; /* enable/disable file trimming */
size_t storage_chunk_max_size; /* The max chunk size */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
Expand Down Expand Up @@ -354,15 +355,16 @@ enum conf_type {
#define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6"

/* Storage / Chunk I/O */
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
Expand Down
6 changes: 3 additions & 3 deletions include/fluent-bit/flb_input_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@

int flb_input_log_append(struct flb_input_instance *ins,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
void *buf, size_t buf_size);

int flb_input_log_append_records(struct flb_input_instance *ins,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
void *buf, size_t buf_size);

int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins,
size_t processor_starting_stage,
const char *tag,
size_t tag_len,
const void *buf,
void *buf,
size_t buf_size);
#endif
3 changes: 2 additions & 1 deletion plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,9 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
file->tag_len,
file->sl_log_event_encoder->output_buffer,
file->sl_log_event_encoder->output_length);

flb_log_event_encoder_reset(file->sl_log_event_encoder);
flb_free(file->sl_log_event_encoder->output_buffer);
flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder);
}
}
else if (file->skip_next) {
Expand Down
5 changes: 5 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/flb_bucket_queue.h>
#include <fluent-bit/flb_input_chunk.h>

const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL";

Expand Down Expand Up @@ -154,6 +155,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_TRIM_FILES,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_trim_files)},
{FLB_CONF_STORAGE_CHUNK_MAX_SIZE,
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, storage_chunk_max_size)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
Expand Down Expand Up @@ -278,6 +282,7 @@ struct flb_config *flb_config_init()
config->storage_path = NULL;
config->storage_input_plugin = NULL;
config->storage_metrics = FLB_TRUE;
config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE;

config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
Expand Down
5 changes: 4 additions & 1 deletion src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/stream_processor/flb_sp.h>
#include <fluent-bit/flb_ring_buffer.h>
#include <fluent-bit/flb_log_event.h>
#include <chunkio/chunkio.h>
#include <monkey/mk_core.h>

Expand Down Expand Up @@ -1124,7 +1125,9 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
}

if (id >= 0) {
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk) ||
(flb_input_chunk_get_real_size(ic) + chunk_size) >
FLB_INPUT_CHUNK_FS_MAX_SIZE) {
ic = NULL;
}
else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
Expand Down
149 changes: 142 additions & 7 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,138 @@
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#include <monkey/mk_core/mk_list.h>

struct buffer_entry {
char *buf;
size_t buf_size;
struct mk_list _head;
};

static struct buffer_entry *new_buffer_entry(void *buf, size_t buf_size)
{
struct buffer_entry *new_entry = flb_malloc(sizeof(struct buffer_entry));
new_entry->buf_size = buf_size;
new_entry->buf = buf;
return new_entry;
}

static void buffer_entry_destroy(struct buffer_entry *entry) {
if (!entry) {
return;
}
if (entry->buf) {
flb_free(entry->buf);
}
mk_list_del(&entry->_head);
flb_free(entry);
}

static int split_buffer_entry(struct buffer_entry *entry,
struct mk_list *entries,
int buf_entry_max_size)
{
int ret;
int encoder_result;
void *tmp_encoder_buf;
size_t tmp_encoder_buf_size;
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
int entries_processed;
struct buffer_entry *new_buffer;

ret = flb_log_event_decoder_init(&log_decoder, entry->buf, entry->buf_size);
if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_error("Log event decoder initialization error : %d", ret);

return FLB_FALSE;
}

ret = flb_log_event_encoder_init(&log_encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_error("Log event encoder initialization error : %d", ret);

flb_log_event_decoder_destroy(&log_decoder);

return FLB_FALSE;
}

entries_processed = 0;
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
encoder_result = flb_log_event_encoder_begin_record(&log_encoder);
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_set_timestamp(
&log_encoder, &log_event.timestamp);
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder, log_event.metadata);
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
&log_encoder, log_event.body);
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_commit_record(&log_encoder);
}

if (encoder_result != FLB_EVENT_ENCODER_SUCCESS) {
flb_error("log event encoder error : %d", encoder_result);
continue;
}

if (log_encoder.output_length >= buf_entry_max_size) {
tmp_encoder_buf_size = log_encoder.output_length;
tmp_encoder_buf = log_encoder.output_buffer;
flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size);
mk_list_add(&new_buffer->_head, entries);
}

entries_processed++;
}

if (log_encoder.output_length >= 0) {
tmp_encoder_buf_size = log_encoder.output_length;
tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size);
memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size);
new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size);
mk_list_add(&new_buffer->_head, entries);
}

flb_log_event_encoder_destroy(&log_encoder);
flb_log_event_decoder_destroy(&log_decoder);
return FLB_TRUE;
}


static int input_log_append(struct flb_input_instance *ins,
size_t processor_starting_stage,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
void *buf, size_t buf_size)
{
int ret;
int processor_is_active;
void *out_buf = (void *) buf;
size_t out_size = buf_size;
struct mk_list buffers;
struct mk_list *head;
struct mk_list *tmp;
struct buffer_entry *start_buffer;
struct buffer_entry *iter_buffer;

processor_is_active = flb_processor_is_active(ins->processor);
if (processor_is_active) {
Expand Down Expand Up @@ -68,9 +189,23 @@ static int input_log_append(struct flb_input_instance *ins,
}
}

ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);

if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
mk_list_init(&buffers);
start_buffer = new_buffer_entry(buf, buf_size);
split_buffer_entry(start_buffer, &buffers, ins->config->storage_chunk_max_size);
flb_free(start_buffer);
mk_list_foreach_safe(head, tmp, &buffers) {
iter_buffer = mk_list_entry(head, struct buffer_entry, _head);
records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size);
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len,
iter_buffer->buf, iter_buffer->buf_size);
buffer_entry_destroy(iter_buffer);
}
} else {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, buf, buf_size);
}

if (processor_is_active && buf != out_buf) {
flb_free(out_buf);
Expand All @@ -81,7 +216,7 @@ static int input_log_append(struct flb_input_instance *ins,
/* Take a msgpack serialized record and enqueue it as a chunk */
int flb_input_log_append(struct flb_input_instance *ins,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
void *buf, size_t buf_size)
{
int ret;
size_t records;
Expand All @@ -96,7 +231,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins,
size_t processor_starting_stage,
const char *tag,
size_t tag_len,
const void *buf,
void *buf,
size_t buf_size)
{
return input_log_append(ins,
Expand All @@ -112,7 +247,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins,
int flb_input_log_append_records(struct flb_input_instance *ins,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
void *buf, size_t buf_size)
{
int ret;

Expand Down
Loading