Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[File size flush inteverval] Best practices for reducing network costs #255

Open
liraric opened this issue Mar 20, 2023 · 3 comments
Open
Labels

Comments

@liraric
Copy link

liraric commented Mar 20, 2023

Hi all, hope everithing is fine.
Currently we are working on a feature that uses this connector to dump debezium CDC messages. Our deployed connector is currently working based on default values, and as far as I can get, the only ways to control flush intervals are based on time (offset.flush.interval.ms, which defaults to 60 secs) and num of records per flushed files (file.max.records, didn't see a default value, so I'm assuming it will write as many lines as possible to the file during the flush interval window).

Our current cost for writing these files is pretty large, so I was wondering if you'd have any bet practices to improve performance of the connectors in order to reduce network costs during the write to gcs.

Thanks for any help!
Cheers

@chadleeshaw
Copy link

chadleeshaw commented Oct 4, 2023

I am also struggling with flush intervals and file sizes. I want to try and only flush every ten minutes or when the heap is filled up. I currently have these settings:

connector.class=io.aiven.kafka.connect.gcs.GcsSinkConnector
topics=MyTopic
tasks.max=4
format.output.fields=key,value,offset,timestamp
gcs.bucket.name=MyBucket
gcs.credentials.path=MyCreds
file.name.timestamp.timezone=America/Denver
format.output.type=jsonl
file.name.template={{topic}}/{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/{{timestamp:unit=HH}}/{{topic}}-{{partition}}-{{start_offset}}.gz
file.max.records=1000000
offset.flush.interval.ms=600000
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

But I'm still flushing every couple of minutes even though I have 32GB of heap... I don't get it.
I'm getting about 250K messages (77MB) compressed to 7MB.

Looks like someone stated that the flush interval has to be set on the worker not the connector settings: #263
https://docs.confluent.io/platform/current/connect/references/allconfigs.html

@chadleeshaw
Copy link

Looks like you have to set offset.flush.interval.ms inside the worker.properties file. Setting inside the connector config will not work.

@Setsushin
Copy link

Hi guys, I also met this issue.
With enlarging the offset.flush.interval.ms inside the worker.properties file, the sink speed becomes normal.
But it does affect a lot to other topics/connectors. Their speed also become slow.
So I wonder if there an optimal solution or a plan of improving this point. Thanks!

@ahmedsobeh ahmedsobeh transferred this issue from Aiven-Open/gcs-connector-for-apache-kafka Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants