Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: Add parquet compression type #8837

Open
wants to merge 31 commits into
base: master
Choose a base branch
from

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented May 20, 2024

With columnify command we're able to support parquet format on out_s3.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    trace
    HTTP_Server  Off
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name dummy
    Tag  dummy.local
    dummy {"boolean": false, "int": 1, "long": 1, "float": 1.1, "double": 1.1, "bytes": "foo", "string": "foo"}

[OUTPUT]
    Name  s3
    Match dummy*
    Region us-east-2
    bucket fbit-parquet-s3
    Use_Put_object true
    compression parquet
    parquet.schema_file schema-dummy.avsc

schema-dummy.avsc

{
  "type": "record",
  "name": "DummyMessages",
  "fields" : [
    {"name": "boolean", "type": "boolean"},
    {"name": "int",     "type": "int"},
    {"name": "long",    "type": "long"},
    {"name": "float",   "type": "float"},
    {"name": "double",  "type": "double"},
    {"name": "bytes",   "type": "bytes"},
    {"name": "string",  "type": "string"}
  ]
}
  • Debug log output from testing the change
Fluent Bit v3.0.4
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/05/23 15:33:05] [ info] Configuration:
[2024/05/23 15:33:05] [ info]  flush time     | 5.000000 seconds
[2024/05/23 15:33:05] [ info]  grace          | 5 seconds
[2024/05/23 15:33:05] [ info]  daemon         | 0
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  inputs:
[2024/05/23 15:33:05] [ info]      dummy
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  filters:
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  outputs:
[2024/05/23 15:33:05] [ info]      s3.0
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  collectors:
[2024/05/23 15:33:05] [ info] [fluent bit] version=3.0.4, commit=7de2c45227, pid=952893
[2024/05/23 15:33:05] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/05/23 15:33:05] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/05/23 15:33:05] [ info] [cmetrics] version=0.9.0
[2024/05/23 15:33:05] [ info] [ctraces ] version=0.5.1
[2024/05/23 15:33:05] [ info] [input:dummy:dummy.0] initializing
[2024/05/23 15:33:05] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/05/23 15:33:05] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/05/23 15:33:05] [debug] [s3:s3.0] created event channels: read=23 write=24
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] Using upload size 100000000 bytes
[2024/05/23 15:33:05] [debug] [output:s3:s3.0] parquet.compression format is SNAPPY
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] parquet.record_type format is jsonl
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] parquet.schema_type format is avro
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized Env Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] creating profile (null) provider
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized AWS Profile Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] Not initializing EKS provider because AWS_ROLE_ARN was not set
[2024/05/23 15:33:05] [debug] [aws_credentials] Not initializing ECS Provider because AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized EC2 Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] Sync called on the EC2 provider
[2024/05/23 15:33:05] [debug] [aws_credentials] Init called on the env provider
[2024/05/23 15:33:05] [debug] [aws_credentials] upstream_set called on the EC2 provider
[2024/05/23 15:33:05] [ info] [sp] stream processor started
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] worker #0 started
[2024/05/23 15:33:05] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:06] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:07] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:08] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:09] [trace] [task 0x61437a0] created (id=0)
[2024/05/23 15:33:09] [debug] [task] created task=0x61437a0 id=0 OK
[2024/05/23 15:33:09] [debug] [output:s3:s3.0] task_id=0 assigned to thread #0
[2024/05/23 15:33:09] [debug] [output:s3:s3.0] Creating upload timer with frequency 60s
[2024/05/23 15:33:09] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:09] [debug] [out flush] cb_destroy coro_id=0
[2024/05/23 15:33:09] [trace] [coro] destroy coroutine=0x6143a40 data=0x6143a60
[2024/05/23 15:33:09] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/05/23 15:33:09] [debug] [task] destroy task=0x61437a0 (task_id=0)
[2024/05/23 15:33:10] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:11] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
^C[2024/05/23 15:33:12] [engine] caught signal (SIGINT)
[2024/05/23 15:33:12] [trace] [engine] flush enqueued data
[2024/05/23 15:33:12] [trace] [task 0x61bc460] created (id=0)
[2024/05/23 15:33:12] [debug] [out flush] cb_destroy coro_id=1
[2024/05/23 15:33:12] [debug] [task] created task=0x61bc460 id=0 OK
[2024/05/23 15:33:12] [trace] [coro] destroy coroutine=0x61bc700 data=0x61bc720
[2024/05/23 15:33:12] [debug] [output:s3:s3.0] task_id=0 assigned to thread #0
[2024/05/23 15:33:12] [ warn] [engine] service will shutdown in max 5 seconds
[2024/05/23 15:33:12] [ info] [input] pausing dummy.0
[2024/05/23 15:33:12] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/05/23 15:33:12] [debug] [task] destroy task=0x61bc460 (task_id=0)
[2024/05/23 15:33:12] [ info] [engine] service has stopped (0 pending tasks)
[2024/05/23 15:33:12] [ info] [input] pausing dummy.0
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] thread worker #0 stopping...
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] thread worker #0 stopped
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] Sending all locally buffered data to S3
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] Pre-compression chunk size is 882, After compression, chunk is 981 bytes
[2024/05/23 15:33:12] [trace] [upstream] get new connection for s3.us-east-2.amazonaws.com:443, net setup:
net.connect_timeout        = 10 seconds
net.source_address         = any
net.keepalive              = enabled
net.keepalive_idle_timeout = 30 seconds
net.max_worker_connections = 0
[2024/05/23 15:33:13] [trace] [net] connection #27 in process to s3.us-east-2.amazonaws.com:443
[2024/05/23 15:33:13] [trace] [tls] connection and handshake OK
[2024/05/23 15:33:13] [trace] [io] connection OK
[2024/05/23 15:33:13] [debug] [upstream] KA connection #27 to s3.us-east-2.amazonaws.com:443 is connected
[2024/05/23 15:33:13] [debug] [http_client] not using http_proxy for header
[2024/05/23 15:33:13] [debug] [aws_credentials] Requesting credentials from the env provider..
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] trying 1608 bytes
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] ret=1608 total=1608/1608
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] trying 981 bytes
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] ret=981 total=981/981
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_read] try up to 4095 bytes
[2024/05/23 15:33:14] [trace] [io coro=(nil)] [net_read] ret=299
[2024/05/23 15:33:14] [debug] [upstream] KA connection #27 to s3.us-east-2.amazonaws.com:443 is now available

Install columnify with:

$ go install github.com/reproio/columnify/cmd/columnify@latest
# ...
$ which columnify
/path/to/columnify
$ echo $?
0
  • Attached Valgrind output that shows no leaks or memory corruption was found
==35435== 
==35435== HEAP SUMMARY:
==35435==     in use at exit: 0 bytes in 0 blocks
==35435==   total heap usage: 20,529 allocs, 20,529 frees, 3,530,224 bytes allocated
==35435== 
==35435== All heap blocks were freed -- no leaks are possible
==35435== 
==35435== For lists of detected and suppressed errors, rerun with: -s
==35435== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1380

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

plugins/out_s3/s3.c Outdated Show resolved Hide resolved
plugins/out_s3/s3_win32_compat.h Outdated Show resolved Hide resolved
plugins/out_s3/s3.c Outdated Show resolved Hide resolved
Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920
Copy link
Contributor Author

I finished to extract parquet related functions into s3_parquet.c. 💪

@cosmo0920
Copy link
Contributor Author

Hi, @PettitWesley and @edsiper.
You guys are still requesting changes. However, I had already addressed these requests.
So, could you all take a look again?

@edsiper
Copy link
Member

edsiper commented Jul 8, 2024

lets move it to the next milestone waiting for AWS feedback

@edsiper edsiper self-assigned this Jul 8, 2024
@edsiper edsiper removed this from the Fluent Bit v3.1.0 milestone Jul 8, 2024
@csm-kb
Copy link

csm-kb commented Sep 6, 2024

Hey team! AWS feedback is taking a little while... any update, or anything I or we can do to help this move along? Looking to leverage the impending feature for my canonical log engine and optimize some storage costs 😎

@edsiper
Copy link
Member

edsiper commented Sep 27, 2024

hi @PettitWesley, can you please take a look at this extension of S3 connector ?

@mmackic
Copy link

mmackic commented Oct 18, 2024

@edsiper, if PettitWesley is no longer active or available for approval, is there anybody else from AWS who could approve the changes?

@mmackic
Copy link

mmackic commented Oct 24, 2024

Hi, is there any update on this request? We really need this functionality

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants