diff --git a/nri-kafka/src/Kafka/Worker.hs b/nri-kafka/src/Kafka/Worker.hs index b403b32a..ac40b6a1 100644 --- a/nri-kafka/src/Kafka/Worker.hs +++ b/nri-kafka/src/Kafka/Worker.hs @@ -27,6 +27,7 @@ module Kafka.Worker Internal.receiveRawMessages, Internal.PartitionOffset (..), Partition.SeekCmd (..), + Partition.ProcessAttemptsCount (..), Internal.CommitToKafkaAsWell (..), ) where diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 912b400f..d33c050d 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -71,6 +71,7 @@ data PartitionOffset = PartitionOffset -- | The partition's offset. offset :: Int } + deriving (Show) -- | Create a subscription for a topic. -- @@ -92,7 +93,7 @@ subscription topic callback = { topic = Kafka.Topic topic, onMessage = Partition.MessageCallback - ( \_ msg -> do + ( \_ _ msg -> do callback msg Task.succeed Partition.NoSeek ), @@ -118,14 +119,14 @@ subscription topic callback = -- > sql -- > "SELECT partition, offset FROM offsets WHERE partition = %" -- > [partitions] ) --- > (\msg -> Debug.todo "Process your message here!") +-- > (\retryCount msg -> Debug.todo "Process your message here!") -- > process settings subscription subscriptionManageOwnOffsets :: (Aeson.FromJSON msg, Aeson.ToJSON msg) => Text -> CommitToKafkaAsWell -> ([Int] -> Task Text (List PartitionOffset)) -> - (PartitionOffset -> msg -> Task Text Partition.SeekCmd) -> + (PartitionOffset -> Partition.ProcessAttemptsCount -> msg -> Task Text Partition.SeekCmd) -> TopicSubscription subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback = TopicSubscription diff --git a/nri-kafka/src/Kafka/Worker/Partition.hs b/nri-kafka/src/Kafka/Worker/Partition.hs index c1992930..dd73d17a 100644 --- a/nri-kafka/src/Kafka/Worker/Partition.hs +++ b/nri-kafka/src/Kafka/Worker/Partition.hs @@ -11,6 +11,7 @@ module Kafka.Worker.Partition SeekCmd (..), CommitOffsets (..), MessageFormat (..), + ProcessAttemptsCount (..), -- just exported for tests microSecondsDelayForAttempt, OnStartup (OnStartup), @@ -95,7 +96,7 @@ newtype OnCleanup = OnCleanup (Prelude.IO ()) data MessageCallback where MessageCallback :: (Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) => - (Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) -> + (Consumer.ConsumerRecord () () -> ProcessAttemptsCount -> msg -> Task e SeekCmd) -> MessageCallback data CommitOffsets @@ -255,7 +256,7 @@ processMsgLoop skipOrNot messageFormat commitOffsets observabilityHandler state Platform.setTracingSpanDetailsIO log details handleFailures log <| do msg <- decodeMessage messageFormat record - runCallback record {Consumer.crKey = (), Consumer.crValue = ()} msg + runCallback record {Consumer.crKey = (), Consumer.crValue = ()} processAttempts msg |> Task.mapError WorkerCallbackFailed |> Task.onError ( \err -> do @@ -282,7 +283,7 @@ microSecondsDelayForAttempt attempts = 2 Prelude.^ (min attempts 10) * 1_000_000 handleFailures :: - Show e => + (Show e) => Platform.LogHandler -> Task (WorkerError e) a -> Prelude.IO () diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index 934a3c02..62a3dff6 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -4,6 +4,7 @@ module Helpers stopWorker, test, sendSync, + spawnWorkerManagingOwnOffsets, ) where @@ -78,6 +79,42 @@ spawnWorker handler' topic callback = Async.link async Prelude.pure (Worker async) +spawnWorkerManagingOwnOffsets :: + (Aeson.ToJSON msg, Aeson.FromJSON msg) => + TestHandler -> + Internal.Topic -> + (Worker.PartitionOffset -> Worker.ProcessAttemptsCount -> msg -> STM.STM Worker.SeekCmd) -> + Expect.Expectation' Worker +spawnWorkerManagingOwnOffsets handler' topic callback = + Expect.fromIO <| do + settings <- + case Environment.decodeDefaults Worker.Settings.decoder of + Ok settings' -> Prelude.pure settings' + Err err -> Prelude.fail (Text.toList err) + async <- + Kafka.Worker.Internal.processWithoutShutdownEnsurance + settings + (Consumer.ConsumerGroupId "group") + ( Worker.subscriptionManageOwnOffsets + (Internal.unTopic topic) + Worker.CommitToKafkaAsWell + ( \partitions -> + partitions + |> List.map (\id -> Worker.PartitionOffset {Worker.partitionId = id, Worker.offset = 0}) + |> Task.succeed + ) + ( \partitionOffset retryCount msg -> do + callback partitionOffset retryCount msg + |> STM.atomically + |> map Ok + |> Platform.doAnything (doAnything handler') + ) + ) + |> Async.race_ (returnWhenTerminating handler') + |> Async.async + Async.link async + Prelude.pure (Worker async) + -- | Stops a single worker stopWorker :: Worker -> Expect.Expectation stopWorker (Worker async) = @@ -119,7 +156,7 @@ testHandler Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeou Prelude.pure TestHandler {producer, doAnything, terminator} -- | puts a message synchronously onto a topic-partition -sendSync :: Aeson.ToJSON a => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation +sendSync :: (Aeson.ToJSON a) => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation sendSync handler topicName partitionId msg' = Platform.tracingSpan "Sync send Kafka messages" @@ -175,7 +212,7 @@ record topicName partitionId val = -- | test helper, that yields a new @Kafka.Topic@ and @TestHandler@ test :: - Stack.HasCallStack => + (Stack.HasCallStack) => Text -> ((Internal.Topic, TestHandler) -> Expect.Expectation) -> Test.Test diff --git a/nri-kafka/test/Spec/Kafka/Worker/Integration.hs b/nri-kafka/test/Spec/Kafka/Worker/Integration.hs index 56b75af4..8ee39ea6 100644 --- a/nri-kafka/test/Spec/Kafka/Worker/Integration.hs +++ b/nri-kafka/test/Spec/Kafka/Worker/Integration.hs @@ -4,6 +4,7 @@ import qualified Control.Concurrent.STM as STM import qualified Dict import qualified Expect import qualified Helpers +import qualified Kafka.Worker as Worker import qualified Set import qualified Test import qualified Prelude @@ -55,7 +56,25 @@ tests = [ (1, [2, 1]), (2, [2, 1]) ] + ), + Helpers.test "Self-managing workers get retry count info" <| \(topic, handler) -> do + Helpers.sendSync handler topic 1 1 + msgsTVar <- atomically (STM.newTVar Set.empty) + _ <- + Helpers.spawnWorkerManagingOwnOffsets + handler + topic + ( \partitionOffset (Worker.ProcessAttemptsCount retryCount) msg -> do + STM.modifyTVar' msgsTVar (Set.insert (msg, retryCount)) + if retryCount < 1 + then STM.throwSTM (Prelude.userError "retry please") + else + Prelude.pure + <| Worker.SeekToOffset ((Worker.offset partitionOffset) + 1) ) + msgs' <- waitFor msgsTVar (\items -> Set.size items == 1) + -- Assert that the message was recorded on its first retry + msgs' |> Expect.equal (Set.fromList [(1, 1)]) ] ] @@ -70,7 +89,7 @@ waitFor tVar pred = then Prelude.pure val else STM.retry -groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c) +groupDictAndMap :: (Ord b) => (a -> (b, c)) -> List a -> Dict.Dict b (List c) groupDictAndMap f = List.foldr ( \x ->