-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Confirm NextShardIterator key exist to fetch from existing consumer shard #26
base: master
Are you sure you want to change the base?
Conversation
hi @jmcgrath207 Interesting, I assumed at the time something more complicated would be required if the shard count was increased hence the some random googling.. https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html
https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/
https://tutorialsdojo.com/kinesis-scaling-resharding-and-parallel-processing/
Hmm, interesting.
Hmm, there is quite a bit of java code (haha) in that library so not clear (after 3m of looking) how it entirely works. I'm in favour of a simple approach, ie abort if a reshard happens. This is fine as container services will just restart. So long as on restart it then does the right thing of draining the old shards first. Open to ideas, in your PR, does skipping the shard do the right thing in this case? Guessing probably not? If it does then I am happy to merge as its far simpler than alternative! So sounds like ideally it would need to (just rambling here..):
hmm, sounds complicated, would be cool as then can just dynamically reshard up and down and the consumer would be able to keep going like a champ hehe. |
Bit more.. https://awsdocs.s3.amazonaws.com/kinesis/latest/kinesis-dg.pdf (2013)
Ahh, so the stream changes state until the reshard is done. Right.. So basically to clarify the current behaviour. Consumer will keep going, skipping the exausted shards. However it will then only be reading the remaining unaffected shards (ie not the new ones) until a restart. If the consumer failed/restarted before draining old shards then that data would be lost. |
sorry one more comment.. https://github.com/ungikim/kinsumer/blob/master/kinsumer/consumer.py That implementation uses timer/poll to check the shards every hour. see Thats the thing currently, if you add more shards you need to restart the consumer(s) :( @akursar no wonder the |
Hey @hampsterx, So after spending sometime with this change, it does work by definition however, it's very brute force and I also failed to take into consideration the checkpointer portion, where it would basically blacklist the shard once exhausted. So in my scenario, I had a stream with 3 shards, I bump it to 4, then 5, in a 24 hour period. This caused the consumer to return 22 shards in total with most being empty. With this change it would go through all the shards(that are active) to see if it has data or not. Not great but it works. Saying that I think that part is unavoidable during our first run, however with the check pointer being aware of it we can avoid that once more consumer are added assuming redis checkpointer is being used. Here is what I am thinking I should change:
With this we would still get the remaining records from the parent, but once exhausted we tell it too not track again that shard again. This idea falls apart with the memory checkpointer though. Let me know if I am missing something here. example line 190 in the consumer.py if shard.get("fetch"):
# timer( monotonic based) logic for refreshing shard list based on data retention time that is returned
if shard["fetch"].done():
result = shard["fetch"].result()
if not result:
shard["fetch"] = None
continue
records = result["Records"]
if records:
log.debug(
"Shard {} got {} records".format(
shard["ShardId"], len(records)
)
)
total_items = 0
for row in result["Records"]:
for n, output in enumerate(
self.processor.parse(row["Data"])
):
await self.queue.put(output)
total_items += n + 1
if not result.get("NextShardIterator") or result.get("NextShardIterator") == 'null':
shard["fetch"] = None
self.checkpointer.deallocate(shard["ShardId"])
continue |
yep I forgot about the checkpointer part as well. The deallocate (for redis) should probably set an expiring key to the duration of the stream retention so if restarted it would know that is an expired shard? The memory checkpointer I would not be too worried about, I don't really think it makes too much sense not using checkpointing so was mostly there for unit tests, probably it should just error/abort. Timer logic, was thinking it should be more like every 5m, ie duration in seconds Still need some logic to say don't start consuming those new shards until the old ones are drained aye When you get this working i'd like to try and add a unit test that changes the shard count to confirm the behaviour. Seems like its supported in kinesalite but whether it maps correctly to AWS behaviour or not who knows. |
@hampsterx any plans to merge it any time soon? with the release of ON_DEMAND mode for kinesis, this PR is a must have |
hi @chekan-o this PR needs some more work thats for sure and this was all @jmcgrath207 effort here. This feature is kind of complex hence why very few libraries except KCL implement it. I will see if I can stamina the strength to work on this soonish but no promises, might even dumb it down a bit so it just drains the old shards on reshard event and then dies, would be far simpler. Are you using the |
Yes it suites our load profile quite well, and we got quite busy stream with spiky load. On demand works well with lambdas, but when we try it with long running service it would just go down for 24 hours, which is not acceptable in production. |
Fully supporting shard split/merge logic requires some rework. To summarize the above: In "maintenance" task: List all shards. Update checkpointer metadata based on shard ID at a regular interval. We'd have to spawn a new task when a shard doesn't exist in checkpointer. "per shard" task can have the states:
If we do it this way, then we can just have each task toss their records onto the pile until they're all closed. Leaving this comment in case reading the Java code wasn't providing insights. May attempt a PR later on. |
Okay. Realized why this project hasn't been active recently: AWS allowed lambdas to act as consumers, so those wanting to use Python can just use a lambda triggering on KDS events. |
This project was built for KDF running in container but I haven't needed to use it on recent projects so its languished a bit, the resharding support would be great though. If you can use Lambda (KDS) all the better just not every project is suitable for it. It's equivalent to KCL (Java) or |
Understood. I was confused because most of the activity around packages similar to this one died around late 2019/early 2020. Then I found out AWS had hooked up lambda support on KDS. If you'd still find it useful, then I shall attempt to do it when I get a chance. ^^ |
Hey Hampsterx,
So I was added shards to my kinesis stream and from the discussion I had with AWS engineer is those old shards would still be active for at least during the data retention period of your stream. However in the results from the old shards, they do not contain the
NextShardIterator
key in the results, even though it's mention it will be null in their docs.https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_records
This basically makes the consumer move to the next shard if the key
NextShardIterator
is not found or is stringnull
in results after returning output.