diff --git a/CHANGELOG.md b/CHANGELOG.md index 22fac0d..4996404 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This release contains **BREAKING** changes. Make sure to read and apply upgrade - **[Breaking]** Remove ability to abort transactions using `throw(:abort)`. Please use `raise WaterDrop::Errors::AbortTransaction`. - **[Breaking]** Disallow (similar to ActiveRecord) exiting transactions with `return`, `break` or `throw`. +- [Enhancement] Make variants fiber safe. ### Upgrade Notes diff --git a/lib/waterdrop/producer.rb b/lib/waterdrop/producer.rb index 1acb2bf..f439b9f 100644 --- a/lib/waterdrop/producer.rb +++ b/lib/waterdrop/producer.rb @@ -10,6 +10,9 @@ class Producer include Transactions include ::Karafka::Core::Helpers::Time + # Local storage for given thread waterdrop client references for variants + ::Fiber.send(:attr_accessor, :waterdrop_clients) + # Which of the inline flow errors do we want to intercept and re-bind SUPPORTED_FLOW_ERRORS = [ Rdkafka::RdkafkaError, @@ -276,10 +279,11 @@ def wait(handler) ) end - # @return [Producer::Context] the variant config. Either custom if built using `#with` or + # @return [Producer::Variant] the variant config. Either custom if built using `#with` or # a default one. def current_variant - Thread.current[id] || @default_variant + Fiber.current.waterdrop_clients ||= {} + Fiber.current.waterdrop_clients[id] || @default_variant end # Runs the client produce method with a given message diff --git a/lib/waterdrop/producer/variant.rb b/lib/waterdrop/producer/variant.rb index 914501a..6dbd359 100644 --- a/lib/waterdrop/producer/variant.rb +++ b/lib/waterdrop/producer/variant.rb @@ -76,11 +76,12 @@ def default? scope.instance_methods(false).each do |method_name| class_eval <<-RUBY, __FILE__, __LINE__ + 1 def #{method_name}(*args, &block) - Thread.current[@producer.id] = self + ref = Fiber.current.waterdrop_clients ||= {} + ref[@producer.id] = self @producer.#{method_name}(*args, &block) ensure - Thread.current[@producer.id] = nil + ref[@producer.id] = nil end RUBY end