Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_influxdb: Handle signed/unsigned integer as influx's integer of that representation #9301

60 changes: 46 additions & 14 deletions plugins/out_influxdb/influxdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ static void influxdb_tsmod(struct flb_time *ts, struct flb_time *dupe,
* Convert the internal Fluent Bit data representation to the required one
* by InfluxDB.
*/
static char *influxdb_format(const char *tag, int tag_len,
const void *data, size_t bytes, size_t *out_size,
struct flb_influxdb *ctx)
static int influxdb_format(struct flb_config *config,
struct flb_input_instance *ins,
void *plugin_context,
void *flush_ctx,
int event_type,
const char *tag, int tag_len,
const void *data, size_t bytes,
void **out_data, size_t *out_size)
{
int i;
int ret;
int n_size;
uint64_t seq = 0;
char *buf;
char *str = NULL;
size_t str_size;
char tmp[128];
Expand All @@ -77,14 +81,15 @@ static char *influxdb_format(const char *tag, int tag_len,
struct influxdb_bulk *bulk_body = NULL;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
struct flb_influxdb *ctx = plugin_context;

ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", ret);

return NULL;
return -1;
}

/* Create the bulk composer */
Expand Down Expand Up @@ -171,11 +176,21 @@ static char *influxdb_format(const char *tag, int tag_len,
}
else if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
val = tmp;
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64);
if (ctx->use_influxdb_integer) {
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 "i", v->via.u64);
}
else {
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64);
}
}
else if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
val = tmp;
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64);
if (ctx->use_influxdb_integer) {
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64 "i", v->via.i64);
}
else {
val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64);
}
}
else if (v->type == MSGPACK_OBJECT_FLOAT || v->type == MSGPACK_OBJECT_FLOAT32) {
val = tmp;
Expand Down Expand Up @@ -268,8 +283,8 @@ static char *influxdb_format(const char *tag, int tag_len,

flb_log_event_decoder_destroy(&log_decoder);

*out_data = bulk->ptr;
*out_size = bulk->len;
buf = bulk->ptr;

/*
* Note: we don't destroy the bulk as we need to keep the allocated
Expand All @@ -280,7 +295,7 @@ static char *influxdb_format(const char *tag, int tag_len,
influxdb_bulk_destroy(bulk_head);
influxdb_bulk_destroy(bulk_body);

return buf;
return 0;

error:
if (bulk != NULL) {
Expand All @@ -295,7 +310,7 @@ static char *influxdb_format(const char *tag, int tag_len,

flb_log_event_decoder_destroy(&log_decoder);

return NULL;
return -1;
}

static int cb_influxdb_init(struct flb_output_instance *ins, struct flb_config *config,
Expand Down Expand Up @@ -453,6 +468,7 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk,
int is_metric = FLB_FALSE;
size_t b_sent;
size_t bytes_out;
void *out_buf;
char *pack;
char tmp[128];
struct mk_list *head;
Expand All @@ -477,12 +493,17 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk,
}
else {
/* format logs */
pack = influxdb_format(event_chunk->tag, flb_sds_len(event_chunk->tag),
event_chunk->data, event_chunk->size,
&bytes_out, ctx);
if (!pack) {
ret = influxdb_format(config, i_ins,
ctx, NULL,
event_chunk->type,
event_chunk->tag, flb_sds_len(event_chunk->tag),
event_chunk->data, event_chunk->size,
&out_buf, &bytes_out);
if (ret != 0) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}

pack = (char *) out_buf;
}

/* Get upstream connection */
Expand Down Expand Up @@ -569,6 +590,10 @@ static int cb_influxdb_exit(void *data, struct flb_config *config)
flb_utils_split_free(ctx->tag_keys);
}

if (ctx->seq_name) {
flb_free(ctx->seq_name);
}

flb_upstream_destroy(ctx->u);
flb_free(ctx);

Expand Down Expand Up @@ -665,6 +690,12 @@ static struct flb_config_map config_map[] = {
"Space separated list of keys that needs to be tagged."
},

{
FLB_CONFIG_MAP_BOOL, "add_integer_suffix", "false",
0, FLB_TRUE, offsetof(struct flb_influxdb, use_influxdb_integer),
"Use influxdb line protocol's integer type suffix."
},

/* EOF */
{0}
};
Expand All @@ -677,6 +708,7 @@ struct flb_output_plugin out_influxdb_plugin = {
.cb_flush = cb_influxdb_flush,
.cb_exit = cb_influxdb_exit,
.config_map = config_map,
.test_formatter.callback = influxdb_format,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS
};
3 changes: 3 additions & 0 deletions plugins/out_influxdb/influxdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ struct flb_influxdb {
/* Arbitrary HTTP headers */
struct mk_list *headers;

/* Use line protocol's integer type */
int use_influxdb_integer;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ if(FLB_IN_LIB)
endif()
FLB_RT_TEST(FLB_OUT_S3 "out_s3.c")
FLB_RT_TEST(FLB_OUT_TD "out_td.c")
FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c")

endif()

Expand Down
Loading
Loading