Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic integration tests #48

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

sv3ndk
Copy link

@sv3ndk sv3ndk commented Apr 10, 2021

Hi,

Thanks for developing this Flink DynamoDB sink. I'm looking for such a sink for my current project, I started experimenting with some integration tests that maybe could be useful to this project?

The main point this PR is IntegrationTest.java, which starts a local instance of DynamoDB (with localstack and test container), launches a minimalistic Flink application that relies on the connector to push the records to DynamoDB and then checks the content of the DB.

Both tests pass :) However, I believe I'm able to reproduce one race condition in the connector with some specific parameters. I'm happy to submit more info about that in a subsequent PR if you want?

I also added a log4j config for the test execution, although we need to specify the flag -Dlog4j.configurationFile=log4j2.xml when starting the tests to make it effective since it seems flink test classes bundle a log4j2-test.properties file that silence the logs. I'm not sure why that file is bundled in those libs TBH.

I hope you find this useful. Of course, any feed-back welcome.

@CLAassistant
Copy link

CLAassistant commented Apr 10, 2021

CLA assistant check
All committers have signed the CLA.

@nirtsruya
Copy link
Collaborator

Hi sv3ndk!
Thank you for the contribution, it is very welcome! would be very interesting to also to see more info about the race condition, that would be very helpful!
I will be looking at the PR and try to merge ASAP!
Thanks

@sv3ndk
Copy link
Author

sv3ndk commented Apr 16, 2021

Thanks for the kind feed-back @nirtsruya .
Have you had a chance to look into it? I have some time this week-end to work on this a bit more if needed.

@sv3ndk
Copy link
Author

sv3ndk commented May 16, 2021

Hi everyone,

I understand you're not currently focusing on this connector, although since this repo has been very helpful to me I thought I'd post an update of my progress here.

This connector works pretty well but I observed the following quirks:

  • the usage of the ExecutorService in DynamoDBProducer may lead to race conditions (admitedlly under quite high load): when I write a test sending 1000's of messages all mapping to the same dynamo primary key, the latest one is not always the one that ends up being persisted. Setting corePoolSize to 1 seems to resolve the issue, which IMHO is better aligned with Flink semantics, since Flink generally assumes sequencial processing per key, and if we want to distribute the load more we can simply increase the sink .parallelism() and let Flink deal with the thread details
  • there is a minor bug in the exponential back-off: a multiplication is used instead of exponent

I finally coded another dynamo sink, taking some simpler assumptions:

  • in my case I never have any deletions and I designed my stream to only emit idempotent upserts => when building a batch, if the same primary key is encountered several time, I can simply just keep the latest one, instead of flushing the buffer as this connector does. This is less generic, but leads to much less frequent and larger batches, which is faster and cheaper
  • I'm using dynamoAsyncClient for dealing with asynchronous behaviors => that simplifies a lot of things since I can rely on the various "happens after" semantics provided by CompleteableFuture
  • I'm using a small buffer of 1 single batch => in case I need to send a new batch before the previous one is confirmed to be successful, I simply block, which in turns propagates back-pressure to Flink. It's largely sufficient for my case atm: writing a batch is just a few milliseconds => we'd need a pretty high traffic per sink instance before saturating that, and if necessary we can still increase Flink parallelism.
  • as the connector in this repo does, I'm flusing my buffer during a snapshot, and I'm also doing it at regular interval in case the traffic becomes idle with a simple Flink timer provided by ProcessingTimeCallback

The result is very simple class of about 150 lines.

In case any of this is or becomes relevant to you at some point, fee free to reach out and I'm pretty sure my employer would happily share this connector.

Thanks again for having open sourced this connector, this has been useful step in my current project.

@nirtsruya
Copy link
Collaborator

Hello sv3ndk, and thank you for your interest in this project, I am glad it could help you!
Sorry for the inactivity, but it was due to personal reasons.
Thanks for the backoff comment, will fix!
Regarding your comments:

  • The connector does not support order by design, and if I understood your suggestion to tackle it, using parallelism > 1 would not guarantee order as well, since these are different jvms and thus there would still be a race condition between the tasks.
  • dynamoAsyncClient is indeed something I considered as well.
  • 👍 for ProcessingTimeCallback it indeed seems like a good solution for that use case, and safer to use than rely only on the checkpoint intervals, or implement a thread.

Again sorry for inactivity, will make sure to check more often.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants