Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kinesis] Allow fully manual checkpointing #340

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

tomconnors
Copy link

For my use case, neither checkpointing every n seconds nor checkpointing by returning true from the processing function is fine-grained enough. I suggest the changes in this PR. Note that I haven't tested this code - wanted to see whether you're open to this change before I spend any actual time on it.

(*' 1000 checkpoint))))
(some (partial mark-checkpoint checkpointer) [1 2 3 4 5]))))))))
(if (= checkpoint-strategy :manual)
(binding [*checkpointer* checkpointer]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's bound to *checkpointer* when the checkpoint-strategy is not :manual?

(processor (functor/fmap
(partial marshall deserializer)
(vec (seq records)))))
(if (or (processor (functor/fmap (partial marshall deserializer)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard to tell just from reading the diff here, but is this the else block that gets run if checkpoint-strategy is not :manual? Is there any way to remove the duplication of the fmap? (Not a big deal but a little bit of a smell, but this codebase is already a bit of compost heap.)

checkpoint-strategy (cond
(number? checkpoint) :timeout
;; not sure of a good name for this strategy:
(false? checkpoint) :boolean
Copy link
Owner

@mcohen01 mcohen01 Oct 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the :boolean case handled in the processor function above? It's hard to read if the behavior of the :timeout case here is the same as the current behavior.

@tomconnors
Copy link
Author

Cleaned up the things you had questions about. Now *checkpointer* is always bound when processor is called, and each checkpoint strategy is explicitly handled, except for :manual, which is handled by doing nothing.

@mcohen01
Copy link
Owner

So is this ready to be merged?

@tomconnors
Copy link
Author

With this latest commit it's working for me and I'd say it's ready to be merged. The version of the aws sdk that amazonica uses doesn't define a UserRecord.getEncryptionType method, so I just removed that line. I didn't see anywhere else in the codebase that :encryption-type was referenced.

@tomconnors
Copy link
Author

Anything I can change here to get this merged?

@dijonkitchen
Copy link

@mcohen01 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants