Skip to content

Commit

Permalink
multiline: cri parser backend, do not use regex
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
  • Loading branch information
ryanohnemus committed Sep 24, 2024
1 parent 072ffe7 commit 7ab84ab
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 8 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#define FLB_PARSER_JSON 2
#define FLB_PARSER_LTSV 3
#define FLB_PARSER_LOGFMT 4
#define FLB_PARSER_CRI 5

struct flb_parser_types {
char *key;
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ if(FLB_PARSER)
flb_parser_regex.c
flb_parser_json.c
flb_parser_decoder.c
flb_parser_cri.c
flb_parser_ltsv.c
flb_parser_logfmt.c
)
Expand Down
11 changes: 11 additions & 0 deletions src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ int flb_parser_logfmt_do(struct flb_parser *parser,
void **out_buf, size_t *out_size,
struct flb_time *out_time);

int flb_parser_cri_do(struct flb_parser *parser,
const char *buf, size_t length,
void **out_buf, size_t *out_size,
struct flb_time *out_time);
/*
* This function is used to free all aspects of a parser
* which is provided by the caller of flb_create_parser.
Expand Down Expand Up @@ -202,6 +206,9 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,
else if (strcasecmp(format, "logfmt") == 0) {
p->type = FLB_PARSER_LOGFMT;
}
else if (strcasecmp(format, "cri") == 0) {
p->type = FLB_PARSER_CRI;
}
else {
flb_error("[parser:%s] Invalid format %s", name, format);
mk_list_del(&p->_head);
Expand Down Expand Up @@ -1007,6 +1014,10 @@ int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length,
return flb_parser_logfmt_do(parser, buf, length,
out_buf, out_size, out_time);
}
else if (parser->type == FLB_PARSER_CRI) {
return flb_parser_cri_do(parser, buf, length,
out_buf, out_size, out_time);
}

return -1;
}
Expand Down
139 changes: 139 additions & 0 deletions src/flb_parser_cri.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#define _GNU_SOURCE
#include <time.h>

#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_parser_decoder.h>
#include <fluent-bit/flb_unescape.h>
#include <fluent-bit/flb_mem.h>

#define CRI_SPACE_DELIM " "

int flb_parser_cri_do(struct flb_parser *parser,
const char *in_buf, size_t in_size,
void **out_buf, size_t *out_size,
struct flb_time *out_time)
{
int ret;
time_t time_lookup = 0;
double tmfrac = 0;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
char *dec_out_buf;
size_t dec_out_size;
size_t map_size = 4; /* always 4 fields for CRI */
char *time_key;
size_t time_key_len;
char* end_of_line = NULL;
char* token_end = NULL;

if (parser->time_key) {
time_key = parser->time_key;
}
else {
time_key = "time";
}
time_key_len = strlen(time_key);

/* Prepare new outgoing buffer */
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
msgpack_pack_map(&tmp_pck, map_size);

/* Time */
token_end = strpbrk(in_buf, CRI_SPACE_DELIM);
if (token_end == NULL) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return -1;
}
token_end = token_end + 1; /* time + a space */

struct flb_tm tm = {0};
ret = flb_parser_time_lookup(in_buf, token_end-in_buf-1,
0, parser, &tm, &tmfrac);
if (ret == -1) {
flb_error("[parser:%s] Invalid time format %s",
parser->name, parser->time_fmt_full);
return -1;
}

msgpack_pack_str(&tmp_pck, time_key_len);
msgpack_pack_str_body(&tmp_pck, time_key, time_key_len);
msgpack_pack_str(&tmp_pck, token_end-in_buf-1);
msgpack_pack_str_body(&tmp_pck, in_buf, token_end-in_buf-1);

/* Stream */
if (!(!strncmp(token_end, "stdout ", 7) || !strncmp(token_end, "stderr ", 7))) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return -1;
}

msgpack_pack_str(&tmp_pck, 6);
msgpack_pack_str_body(&tmp_pck, "stream", 6);
msgpack_pack_str(&tmp_pck, 6);
msgpack_pack_str_body(&tmp_pck, token_end, 6);
token_end = token_end + 7; /* stream + a space */

/* Partial/Full Indicator (P|F) */
if (!(!strncmp(token_end, "F ", 2) || !strncmp(token_end, "P ", 2))) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return -1;
}
msgpack_pack_str(&tmp_pck, 2);
msgpack_pack_str_body(&tmp_pck, "_p", 2);
msgpack_pack_str(&tmp_pck, 1);
msgpack_pack_str_body(&tmp_pck, token_end, 1);
token_end = token_end + 2; /* indicator + a space */

/* Log */
end_of_line = strpbrk(token_end, "\n");
if (end_of_line == NULL ) {
end_of_line = in_buf+in_size;
}
msgpack_pack_str(&tmp_pck, 3);
msgpack_pack_str_body(&tmp_pck, "log", 3);
msgpack_pack_str(&tmp_pck, end_of_line-token_end);
msgpack_pack_str_body(&tmp_pck, token_end, end_of_line-token_end);

/* Export results */
time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone);
out_time->tm.tv_sec = time_lookup;
out_time->tm.tv_nsec = (tmfrac * 1000000000);

*out_buf = tmp_sbuf.data;
*out_size = tmp_sbuf.size;

/* Check if some decoder was specified */
if (parser->decoders) {
ret = flb_parser_decoder_do(parser->decoders,
tmp_sbuf.data, tmp_sbuf.size,
&dec_out_buf, &dec_out_size);
if (ret == 0) {
*out_buf = dec_out_buf;
*out_size = dec_out_size;
msgpack_sbuffer_destroy(&tmp_sbuf);
}
}

return end_of_line-in_buf;
}
1 change: 0 additions & 1 deletion src/multiline/flb_ml_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <fluent-bit/flb_log.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_rule.h>
#include <fluent-bit/multiline/flb_ml_mode.h>
#include <fluent-bit/multiline/flb_ml_group.h>

int flb_ml_parser_init(struct flb_ml_parser *ml_parser)
Expand Down
11 changes: 4 additions & 7 deletions src/multiline/flb_ml_parser_cri.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_parser.h>

#define FLB_ML_CRI_REGEX \
"^(?<time>.+?) (?<stream>stdout|stderr) (?<_p>F|P) (?<log>.*)$"
#define FLB_ML_CRI_TIME \
"%Y-%m-%dT%H:%M:%S.%L%z"

/* Creates a parser for Docker */
/* Creates a parser for CRI */
static struct flb_parser *cri_parser_create(struct flb_config *config)
{
struct flb_parser *p;

p = flb_parser_create("_ml_cri", /* parser name */
"regex", /* backend type */
FLB_ML_CRI_REGEX, /* regex */
"cri", /* backend type */
NULL, /* regex */
FLB_FALSE, /* skip_empty */
FLB_ML_CRI_TIME, /* time format */
"time", /* time key */
Expand All @@ -49,13 +47,12 @@ static struct flb_parser *cri_parser_create(struct flb_config *config)
return p;
}

/* Our first multiline mode: 'docker' */
struct flb_ml_parser *flb_ml_parser_cri(struct flb_config *config)
{
struct flb_parser *parser;
struct flb_ml_parser *mlp;

/* Create a Docker parser */
/* Create a CRI parser */
parser = cri_parser_create(config);
if (!parser) {
return NULL;
Expand Down

0 comments on commit 7ab84ab

Please sign in to comment.