diff --git a/plugins/out_kinesis_streams/kinesis.c b/plugins/out_kinesis_streams/kinesis.c index 2fb46dc6295..a225f6007f7 100644 --- a/plugins/out_kinesis_streams/kinesis.c +++ b/plugins/out_kinesis_streams/kinesis.c @@ -117,7 +117,30 @@ static int cb_kinesis_init(struct flb_output_instance *ins, if (tmp) { ctx->sts_endpoint = (char *) tmp; } - + /* + * Sets the port number for the Kinesis output plugin. + * + * This function uses the port number already set in the output instance's host structure. + * If the port is not set (0), the default HTTPS port is used. + * + * @param ins The output instance. + * @param ctx The Kinesis output plugin context. + */ + flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port); + + if (ins->host.port >= FLB_KINESIS_MIN_PORT && ins->host.port <= FLB_KINESIS_MAX_PORT) { + ctx->port = ins->host.port; + flb_plg_debug(ins, "Setting port to: %d", ctx->port); + } + else if (ins->host.port == 0) { + ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT; + flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port); + } + else { + flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d", + ins->host.port, FLB_KINESIS_MIN_PORT, FLB_KINESIS_MAX_PORT); + goto error; + } tmp = flb_output_get_property("log_key", ins); if (tmp) { @@ -255,14 +278,14 @@ static int cb_kinesis_init(struct flb_output_instance *ins, ctx->kinesis_client->region = (char *) ctx->region; ctx->kinesis_client->retry_requests = ctx->retry_requests; ctx->kinesis_client->service = "kinesis"; - ctx->kinesis_client->port = 443; + ctx->kinesis_client->port = ctx->port; ctx->kinesis_client->flags = 0; ctx->kinesis_client->proxy = NULL; ctx->kinesis_client->static_headers = &content_type_header; ctx->kinesis_client->static_headers_len = 1; struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint, - 443, FLB_IO_TLS, + ctx->port, FLB_IO_TLS, ctx->client_tls); if (!upstream) { flb_plg_error(ctx->ins, "Connection initialization error"); diff --git a/plugins/out_kinesis_streams/kinesis.h b/plugins/out_kinesis_streams/kinesis.h index 72c6b976b07..57a72166617 100644 --- a/plugins/out_kinesis_streams/kinesis.h +++ b/plugins/out_kinesis_streams/kinesis.h @@ -29,6 +29,10 @@ #define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" +#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443 +#define FLB_KINESIS_MIN_PORT 1 +#define FLB_KINESIS_MAX_PORT 65535 + /* buffers used for each flush */ struct flush { /* temporary buffer for storing the serialized event messages */ @@ -92,6 +96,7 @@ struct flb_kinesis { int retry_requests; char *sts_endpoint; int custom_endpoint; + int port; char *profile; /* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */ diff --git a/tests/runtime/out_kinesis.c b/tests/runtime/out_kinesis.c index da3e925a0f8..c8e44a4c874 100644 --- a/tests/runtime/out_kinesis.c +++ b/tests/runtime/out_kinesis.c @@ -188,6 +188,110 @@ void flb_test_firehose_nonsense_error(void) unsetenv("TEST_PUT_RECORDS_ERROR"); } +void flb_test_kinesis_default_port(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_output_instance *out; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "time_key", "time", NULL); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Get the output instance */ + out = flb_output_get_instance(ctx->config, out_ffd); + TEST_CHECK(out != NULL); + + /* Check if the port is set to the default value */ + const char* port = flb_output_get_property("port", out); + TEST_CHECK(port == NULL || strcmp(port, "443") == 0); + TEST_MSG("Default port should be 443 or not set, but got %s", port ? port : "NULL"); + + flb_stop(ctx); + flb_destroy(ctx); +} + + +void flb_test_kinesis_custom_port(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "time_key", "time", NULL); + flb_output_set(ctx, out_ffd, "port", "8443", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_invalid_port(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "port", "99999", NULL); // Invalid port + + ret = flb_start(ctx); + TEST_CHECK(ret != 0); // Expect failure + + flb_stop(ctx); + flb_destroy(ctx); +} /* Test list */ TEST_LIST = { @@ -196,5 +300,8 @@ TEST_LIST = { {"throughput_error", flb_test_firehose_throughput_error }, {"unknown_error", flb_test_firehose_error_unknown }, {"nonsense_error", flb_test_firehose_nonsense_error }, + {"default_port", flb_test_kinesis_default_port }, + {"custom_port", flb_test_kinesis_custom_port }, + {"invalid_port", flb_test_kinesis_invalid_port }, {NULL, NULL} };