From 3c4004333b5aa65a5c56695c038d212bf81b4d15 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 29 Aug 2024 21:09:02 +0900 Subject: [PATCH 1/7] out_influxdb: Handle signed/unsigned integer as influx's integer of that representation This is because in their line protocol, it needs to add "i" suffix for handling a value as an integer: | Datatype | Element(s) | Description | |:--------:|:------------:|:------------| | Integer | Field values | Signed 64-bit integers (-9223372036854775808 to 9223372036854775807). Specify an integer with a trailing i on the number. Example: 1i | So, we need to add "i" suffix for signed/unsigned integer values. ref: https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_reference/ Signed-off-by: Hiroshi Hatake --- plugins/out_influxdb/influxdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index b84d59ca9f8..d49fbb81ea1 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -171,11 +171,11 @@ static char *influxdb_format(const char *tag, int tag_len, } else if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64); + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 "i", v->via.u64); } else if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64); + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64 "i", v->via.i64); } else if (v->type == MSGPACK_OBJECT_FLOAT || v->type == MSGPACK_OBJECT_FLOAT32) { val = tmp; From 4da779b7e0db53670bb755ae91ee56e31dc648ff Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Aug 2024 14:27:11 +0900 Subject: [PATCH 2/7] out_influxdb: Add test_formatter.callback mechanism Signed-off-by: Hiroshi Hatake --- plugins/out_influxdb/influxdb.c | 36 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index d49fbb81ea1..bd74f2a513a 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -58,15 +58,19 @@ static void influxdb_tsmod(struct flb_time *ts, struct flb_time *dupe, * Convert the internal Fluent Bit data representation to the required one * by InfluxDB. */ -static char *influxdb_format(const char *tag, int tag_len, - const void *data, size_t bytes, size_t *out_size, - struct flb_influxdb *ctx) +static int influxdb_format(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) { int i; int ret; int n_size; uint64_t seq = 0; - char *buf; char *str = NULL; size_t str_size; char tmp[128]; @@ -77,6 +81,7 @@ static char *influxdb_format(const char *tag, int tag_len, struct influxdb_bulk *bulk_body = NULL; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + struct flb_influxdb *ctx = plugin_context; ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); @@ -84,7 +89,7 @@ static char *influxdb_format(const char *tag, int tag_len, flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); - return NULL; + return -1; } /* Create the bulk composer */ @@ -268,8 +273,8 @@ static char *influxdb_format(const char *tag, int tag_len, flb_log_event_decoder_destroy(&log_decoder); + *out_data = bulk->ptr; *out_size = bulk->len; - buf = bulk->ptr; /* * Note: we don't destroy the bulk as we need to keep the allocated @@ -280,7 +285,7 @@ static char *influxdb_format(const char *tag, int tag_len, influxdb_bulk_destroy(bulk_head); influxdb_bulk_destroy(bulk_body); - return buf; + return 0; error: if (bulk != NULL) { @@ -295,7 +300,7 @@ static char *influxdb_format(const char *tag, int tag_len, flb_log_event_decoder_destroy(&log_decoder); - return NULL; + return -1; } static int cb_influxdb_init(struct flb_output_instance *ins, struct flb_config *config, @@ -453,6 +458,7 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk, int is_metric = FLB_FALSE; size_t b_sent; size_t bytes_out; + void *out_buf; char *pack; char tmp[128]; struct mk_list *head; @@ -477,12 +483,17 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk, } else { /* format logs */ - pack = influxdb_format(event_chunk->tag, flb_sds_len(event_chunk->tag), - event_chunk->data, event_chunk->size, - &bytes_out, ctx); - if (!pack) { + ret = influxdb_format(config, i_ins, + ctx, NULL, + event_chunk->type, + event_chunk->tag, flb_sds_len(event_chunk->tag), + event_chunk->data, event_chunk->size, + &out_buf, &bytes_out); + if (ret != 0) { FLB_OUTPUT_RETURN(FLB_ERROR); } + + pack = (char *) out_buf; } /* Get upstream connection */ @@ -677,6 +688,7 @@ struct flb_output_plugin out_influxdb_plugin = { .cb_flush = cb_influxdb_flush, .cb_exit = cb_influxdb_exit, .config_map = config_map, + .test_formatter.callback = influxdb_format, .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS }; From 3d3d5b7b9726ae03f00ad43e4eb7cfc360dfd8e2 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Aug 2024 16:55:15 +0900 Subject: [PATCH 3/7] out_influxdb: Plug a memory leak Signed-off-by: Hiroshi Hatake --- plugins/out_influxdb/influxdb.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index bd74f2a513a..bf431a2305b 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -580,6 +580,10 @@ static int cb_influxdb_exit(void *data, struct flb_config *config) flb_utils_split_free(ctx->tag_keys); } + if (ctx->seq_name) { + flb_free(ctx->seq_name); + } + flb_upstream_destroy(ctx->u); flb_free(ctx); From 0d4d6bda2e586237ca44d87211ce0d10160c78a9 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Aug 2024 16:55:44 +0900 Subject: [PATCH 4/7] out_influxdb: test: Add influxdb format test cases Signed-off-by: Hiroshi Hatake --- tests/runtime/CMakeLists.txt | 1 + tests/runtime/out_influxdb.c | 327 +++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 tests/runtime/out_influxdb.c diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 9e5cc4670e4..e902f7892ff 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -124,6 +124,7 @@ if(FLB_IN_LIB) endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") + FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") endif() diff --git a/tests/runtime/out_influxdb.c b/tests/runtime/out_influxdb.c new file mode 100644 index 00000000000..63c711eb1fd --- /dev/null +++ b/tests/runtime/out_influxdb.c @@ -0,0 +1,327 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2024 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "flb_tests_runtime.h" + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +#define JSON_BASIC "[12345678, {\"key\":\"value\"}]" +static void cb_check_basic(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "key=\"value\""; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +#define JSON_FLOAT "[12345678, {\"float\":1.3}]" +static void cb_check_float_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "float=1.3"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +#define JSON_INTEGER "[12345678, {\"int\":100}]" +static void cb_check_int_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "int=100i"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + + +#define JSON_NEGATIVE_INTEGER "[12345678, {\"int\":-200}]" +static void cb_check_negative_int_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "int=-200i"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +void flb_test_basic() +{ + int ret; + int size = sizeof(JSON_BASIC) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_basic, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_float_value() +{ + int ret; + int size = sizeof(JSON_FLOAT) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_FLOAT, size); + TEST_CHECK(ret >= 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_integer_value() +{ + int ret; + int size = sizeof(JSON_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_int_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_negative_integer_value() +{ + int ret; + int size = sizeof(JSON_NEGATIVE_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_negative_int_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_NEGATIVE_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { + {"basic" , flb_test_basic }, + {"float" , flb_test_float_value }, + {"integer" , flb_test_integer_value }, + {"negative_integer" , flb_test_negative_integer_value }, + {NULL, NULL} +}; From f6a45b2bb66d07003284807c878bd6790a418db6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Aug 2024 17:27:06 +0900 Subject: [PATCH 5/7] out_influxdb: Provide a way to turn on to use integer type on influxdb's line protocol Signed-off-by: Hiroshi Hatake --- plugins/out_influxdb/influxdb.c | 20 +++- plugins/out_influxdb/influxdb.h | 3 + tests/runtime/out_influxdb.c | 159 +++++++++++++++++++++++++++++++- 3 files changed, 178 insertions(+), 4 deletions(-) diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index bf431a2305b..b7086a37938 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -176,11 +176,21 @@ static int influxdb_format(struct flb_config *config, } else if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 "i", v->via.u64); + if (ctx->use_influxdb_integer) { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 "i", v->via.u64); + } + else { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64); + } } else if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64 "i", v->via.i64); + if (ctx->use_influxdb_integer) { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64 "i", v->via.i64); + } + else { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64); + } } else if (v->type == MSGPACK_OBJECT_FLOAT || v->type == MSGPACK_OBJECT_FLOAT32) { val = tmp; @@ -680,6 +690,12 @@ static struct flb_config_map config_map[] = { "Space separated list of keys that needs to be tagged." }, + { + FLB_CONFIG_MAP_BOOL, "use_integer", "false", + 0, FLB_TRUE, offsetof(struct flb_influxdb, use_influxdb_integer), + "Use influxdb line protocol's integer type suffix." + }, + /* EOF */ {0} }; diff --git a/plugins/out_influxdb/influxdb.h b/plugins/out_influxdb/influxdb.h index c9ff8a383d3..c21145611c8 100644 --- a/plugins/out_influxdb/influxdb.h +++ b/plugins/out_influxdb/influxdb.h @@ -65,6 +65,9 @@ struct flb_influxdb { /* Arbitrary HTTP headers */ struct mk_list *headers; + /* Use line protocol's integer type */ + int use_influxdb_integer; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/tests/runtime/out_influxdb.c b/tests/runtime/out_influxdb.c index 63c711eb1fd..ec6383ac618 100644 --- a/tests/runtime/out_influxdb.c +++ b/tests/runtime/out_influxdb.c @@ -123,6 +123,53 @@ static void cb_check_negative_int_value(void *ctx, int ffd, flb_free(out); } +static void cb_check_int_as_float_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *missing_index_line = "int=100i"; + char *index_line = "int=100"; + + set_output_num(1); + + p = strstr(out, missing_index_line); + if (!TEST_CHECK(p == NULL)) { + TEST_MSG("Given:%s", out); + } + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +static void cb_check_negative_int_as_float_value( + void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *missing_index_line = "int=-200i"; + char *index_line = "int=-200"; + + set_output_num(1); + + p = strstr(out, missing_index_line); + if (!TEST_CHECK(p == NULL)) { + TEST_MSG("Given:%s", out); + } + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + void flb_test_basic() { int ret; @@ -217,6 +264,7 @@ void flb_test_float_value() flb_destroy(ctx); } +/* Using integer type */ void flb_test_integer_value() { int ret; @@ -241,6 +289,7 @@ void flb_test_integer_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", + "use_integer", "true", NULL); /* Enable test mode */ @@ -291,6 +340,7 @@ void flb_test_negative_integer_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", + "use_integer", "true", NULL); /* Enable test mode */ @@ -317,11 +367,116 @@ void flb_test_negative_integer_value() flb_destroy(ctx); } +/* Not using integer type of line protocol */ +void flb_test_integer_as_float_value() +{ + int ret; + int size = sizeof(JSON_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "use_integer", "false", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_int_as_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_negative_integer_as_float_value() +{ + int ret; + int size = sizeof(JSON_NEGATIVE_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "use_integer", "false", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_negative_int_as_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_NEGATIVE_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"basic" , flb_test_basic }, {"float" , flb_test_float_value }, - {"integer" , flb_test_integer_value }, - {"negative_integer" , flb_test_negative_integer_value }, + {"int_integer" , flb_test_integer_value }, + {"int_negative_integer" , flb_test_negative_integer_value }, + {"int_integer_as_float" , flb_test_integer_as_float_value }, + {"int_negative_integer_as_float" , flb_test_negative_integer_as_float_value }, {NULL, NULL} }; From 6f6bfcbc39e6e75713d1a0c81e8d929ce9e17ad3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 4 Sep 2024 12:31:27 +0900 Subject: [PATCH 6/7] out_influxdb: Use add_integer_suffix name for adding integer suffix on the line protocol Signed-off-by: Hiroshi Hatake --- plugins/out_influxdb/influxdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index b7086a37938..671dd5c16a6 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -691,7 +691,7 @@ static struct flb_config_map config_map[] = { }, { - FLB_CONFIG_MAP_BOOL, "use_integer", "false", + FLB_CONFIG_MAP_BOOL, "add_integer_suffix", "false", 0, FLB_TRUE, offsetof(struct flb_influxdb, use_influxdb_integer), "Use influxdb line protocol's integer type suffix." }, From 66f545885e3d8b4f0de765d4e0394a600ce5f6b3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 5 Sep 2024 16:37:41 +0900 Subject: [PATCH 7/7] out_influxdb: test: Follow the change of parameter name Signed-off-by: Hiroshi Hatake --- tests/runtime/out_influxdb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/runtime/out_influxdb.c b/tests/runtime/out_influxdb.c index ec6383ac618..c536d7ea4e0 100644 --- a/tests/runtime/out_influxdb.c +++ b/tests/runtime/out_influxdb.c @@ -289,7 +289,7 @@ void flb_test_integer_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", - "use_integer", "true", + "add_integer_suffix", "true", NULL); /* Enable test mode */ @@ -340,7 +340,7 @@ void flb_test_negative_integer_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", - "use_integer", "true", + "add_integer_suffix", "true", NULL); /* Enable test mode */ @@ -392,7 +392,7 @@ void flb_test_integer_as_float_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", - "use_integer", "false", + "add_integer_suffix", "false", NULL); /* Enable test mode */ @@ -443,7 +443,7 @@ void flb_test_negative_integer_as_float_value() out_ffd = flb_output(ctx, (char *) "influxdb", NULL); flb_output_set(ctx, out_ffd, "match", "test", - "use_integer", "false", + "add_integer_suffix", "false", NULL); /* Enable test mode */