Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move variants to fiber local #521

Merged
merged 5 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade

- **[Breaking]** Remove ability to abort transactions using `throw(:abort)`. Please use `raise WaterDrop::Errors::AbortTransaction`.
- **[Breaking]** Disallow (similar to ActiveRecord) exiting transactions with `return`, `break` or `throw`.
- [Enhancement] Make variants fiber safe.

### Upgrade Notes

Expand Down
8 changes: 6 additions & 2 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Producer
include Transactions
include ::Karafka::Core::Helpers::Time

# Local storage for given thread waterdrop client references for variants
::Fiber.send(:attr_accessor, :waterdrop_clients)

# Which of the inline flow errors do we want to intercept and re-bind
SUPPORTED_FLOW_ERRORS = [
Rdkafka::RdkafkaError,
Expand Down Expand Up @@ -276,10 +279,11 @@ def wait(handler)
)
end

# @return [Producer::Context] the variant config. Either custom if built using `#with` or
# @return [Producer::Variant] the variant config. Either custom if built using `#with` or
# a default one.
def current_variant
Thread.current[id] || @default_variant
Fiber.current.waterdrop_clients ||= {}
Fiber.current.waterdrop_clients[id] || @default_variant
end

# Runs the client produce method with a given message
Expand Down
5 changes: 3 additions & 2 deletions lib/waterdrop/producer/variant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ def default?
scope.instance_methods(false).each do |method_name|
class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{method_name}(*args, &block)
Thread.current[@producer.id] = self
ref = Fiber.current.waterdrop_clients ||= {}
ref[@producer.id] = self

@producer.#{method_name}(*args, &block)
ensure
Thread.current[@producer.id] = nil
ref[@producer.id] = nil
end
RUBY
end
Expand Down