From 4a063861dc52b4258cc464637440e7b2e4cc0f82 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Wed, 5 Feb 2025 12:44:42 -0300 Subject: [PATCH 1/3] Expose retry counts Only for worker managing its own offsets --- nri-kafka/src/Kafka/Worker.hs | 1 + nri-kafka/src/Kafka/Worker/Internal.hs | 11 ++--- nri-kafka/src/Kafka/Worker/Partition.hs | 7 ++-- nri-kafka/test/Helpers.hs | 41 ++++++++++++++++++- .../test/Spec/Kafka/Worker/Integration.hs | 21 +++++++++- 5 files changed, 70 insertions(+), 11 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker.hs b/nri-kafka/src/Kafka/Worker.hs index b403b32a..ac40b6a1 100644 --- a/nri-kafka/src/Kafka/Worker.hs +++ b/nri-kafka/src/Kafka/Worker.hs @@ -27,6 +27,7 @@ module Kafka.Worker Internal.receiveRawMessages, Internal.PartitionOffset (..), Partition.SeekCmd (..), + Partition.ProcessAttemptsCount (..), Internal.CommitToKafkaAsWell (..), ) where diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 912b400f..104c9265 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -71,6 +71,7 @@ data PartitionOffset = PartitionOffset -- | The partition's offset. offset :: Int } + deriving (Show) -- | Create a subscription for a topic. -- @@ -92,7 +93,7 @@ subscription topic callback = { topic = Kafka.Topic topic, onMessage = Partition.MessageCallback - ( \_ msg -> do + ( \_ _ msg -> do callback msg Task.succeed Partition.NoSeek ), @@ -118,14 +119,14 @@ subscription topic callback = -- > sql -- > "SELECT partition, offset FROM offsets WHERE partition = %" -- > [partitions] ) --- > (\msg -> Debug.todo "Process your message here!") +-- > (\retryCount msg -> Debug.todo "Process your message here!") -- > process settings subscription subscriptionManageOwnOffsets :: (Aeson.FromJSON msg, Aeson.ToJSON msg) => Text -> CommitToKafkaAsWell -> ([Int] -> Task Text (List PartitionOffset)) -> - (PartitionOffset -> msg -> Task Text Partition.SeekCmd) -> + (PartitionOffset -> Partition.ProcessAttemptsCount -> msg -> Task Text Partition.SeekCmd) -> TopicSubscription subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback = TopicSubscription @@ -133,7 +134,7 @@ subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback = commitToKafkaAsWell, onMessage = Partition.MessageCallback - ( \record msg -> do + ( \record retryCount msg -> do let offsetParams = PartitionOffset { partitionId = @@ -141,7 +142,7 @@ subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback = |> partitionIdToInt, offset = Consumer.unOffset (Consumer.crOffset record) } - callback offsetParams msg + callback offsetParams retryCount msg ), offsetSource = Elsewhere diff --git a/nri-kafka/src/Kafka/Worker/Partition.hs b/nri-kafka/src/Kafka/Worker/Partition.hs index c1992930..dd73d17a 100644 --- a/nri-kafka/src/Kafka/Worker/Partition.hs +++ b/nri-kafka/src/Kafka/Worker/Partition.hs @@ -11,6 +11,7 @@ module Kafka.Worker.Partition SeekCmd (..), CommitOffsets (..), MessageFormat (..), + ProcessAttemptsCount (..), -- just exported for tests microSecondsDelayForAttempt, OnStartup (OnStartup), @@ -95,7 +96,7 @@ newtype OnCleanup = OnCleanup (Prelude.IO ()) data MessageCallback where MessageCallback :: (Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) => - (Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) -> + (Consumer.ConsumerRecord () () -> ProcessAttemptsCount -> msg -> Task e SeekCmd) -> MessageCallback data CommitOffsets @@ -255,7 +256,7 @@ processMsgLoop skipOrNot messageFormat commitOffsets observabilityHandler state Platform.setTracingSpanDetailsIO log details handleFailures log <| do msg <- decodeMessage messageFormat record - runCallback record {Consumer.crKey = (), Consumer.crValue = ()} msg + runCallback record {Consumer.crKey = (), Consumer.crValue = ()} processAttempts msg |> Task.mapError WorkerCallbackFailed |> Task.onError ( \err -> do @@ -282,7 +283,7 @@ microSecondsDelayForAttempt attempts = 2 Prelude.^ (min attempts 10) * 1_000_000 handleFailures :: - Show e => + (Show e) => Platform.LogHandler -> Task (WorkerError e) a -> Prelude.IO () diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index 934a3c02..62a3dff6 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -4,6 +4,7 @@ module Helpers stopWorker, test, sendSync, + spawnWorkerManagingOwnOffsets, ) where @@ -78,6 +79,42 @@ spawnWorker handler' topic callback = Async.link async Prelude.pure (Worker async) +spawnWorkerManagingOwnOffsets :: + (Aeson.ToJSON msg, Aeson.FromJSON msg) => + TestHandler -> + Internal.Topic -> + (Worker.PartitionOffset -> Worker.ProcessAttemptsCount -> msg -> STM.STM Worker.SeekCmd) -> + Expect.Expectation' Worker +spawnWorkerManagingOwnOffsets handler' topic callback = + Expect.fromIO <| do + settings <- + case Environment.decodeDefaults Worker.Settings.decoder of + Ok settings' -> Prelude.pure settings' + Err err -> Prelude.fail (Text.toList err) + async <- + Kafka.Worker.Internal.processWithoutShutdownEnsurance + settings + (Consumer.ConsumerGroupId "group") + ( Worker.subscriptionManageOwnOffsets + (Internal.unTopic topic) + Worker.CommitToKafkaAsWell + ( \partitions -> + partitions + |> List.map (\id -> Worker.PartitionOffset {Worker.partitionId = id, Worker.offset = 0}) + |> Task.succeed + ) + ( \partitionOffset retryCount msg -> do + callback partitionOffset retryCount msg + |> STM.atomically + |> map Ok + |> Platform.doAnything (doAnything handler') + ) + ) + |> Async.race_ (returnWhenTerminating handler') + |> Async.async + Async.link async + Prelude.pure (Worker async) + -- | Stops a single worker stopWorker :: Worker -> Expect.Expectation stopWorker (Worker async) = @@ -119,7 +156,7 @@ testHandler Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeou Prelude.pure TestHandler {producer, doAnything, terminator} -- | puts a message synchronously onto a topic-partition -sendSync :: Aeson.ToJSON a => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation +sendSync :: (Aeson.ToJSON a) => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation sendSync handler topicName partitionId msg' = Platform.tracingSpan "Sync send Kafka messages" @@ -175,7 +212,7 @@ record topicName partitionId val = -- | test helper, that yields a new @Kafka.Topic@ and @TestHandler@ test :: - Stack.HasCallStack => + (Stack.HasCallStack) => Text -> ((Internal.Topic, TestHandler) -> Expect.Expectation) -> Test.Test diff --git a/nri-kafka/test/Spec/Kafka/Worker/Integration.hs b/nri-kafka/test/Spec/Kafka/Worker/Integration.hs index 56b75af4..8ee39ea6 100644 --- a/nri-kafka/test/Spec/Kafka/Worker/Integration.hs +++ b/nri-kafka/test/Spec/Kafka/Worker/Integration.hs @@ -4,6 +4,7 @@ import qualified Control.Concurrent.STM as STM import qualified Dict import qualified Expect import qualified Helpers +import qualified Kafka.Worker as Worker import qualified Set import qualified Test import qualified Prelude @@ -55,7 +56,25 @@ tests = [ (1, [2, 1]), (2, [2, 1]) ] + ), + Helpers.test "Self-managing workers get retry count info" <| \(topic, handler) -> do + Helpers.sendSync handler topic 1 1 + msgsTVar <- atomically (STM.newTVar Set.empty) + _ <- + Helpers.spawnWorkerManagingOwnOffsets + handler + topic + ( \partitionOffset (Worker.ProcessAttemptsCount retryCount) msg -> do + STM.modifyTVar' msgsTVar (Set.insert (msg, retryCount)) + if retryCount < 1 + then STM.throwSTM (Prelude.userError "retry please") + else + Prelude.pure + <| Worker.SeekToOffset ((Worker.offset partitionOffset) + 1) ) + msgs' <- waitFor msgsTVar (\items -> Set.size items == 1) + -- Assert that the message was recorded on its first retry + msgs' |> Expect.equal (Set.fromList [(1, 1)]) ] ] @@ -70,7 +89,7 @@ waitFor tVar pred = then Prelude.pure val else STM.retry -groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c) +groupDictAndMap :: (Ord b) => (a -> (b, c)) -> List a -> Dict.Dict b (List c) groupDictAndMap f = List.foldr ( \x -> From 592a9307ec89bcb0fd64dfe648a52837f5fc4009 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 6 Feb 2025 19:43:34 -0300 Subject: [PATCH 2/3] try raising the timeout for nri-kafka --- nri-kafka/test/Main.hs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nri-kafka/test/Main.hs b/nri-kafka/test/Main.hs index 8ec4991a..eb064639 100644 --- a/nri-kafka/test/Main.hs +++ b/nri-kafka/test/Main.hs @@ -2,11 +2,15 @@ module Main (main) where import qualified Spec.Kafka.Worker.Integration import qualified Spec.Kafka.Worker.Partition +import qualified System.Environment import qualified Test import qualified Prelude main :: Prelude.IO () -main = Test.run tests +main = do + -- macos runners seem to be slow and fail on several kafka integration tests + System.Environment.setEnv "NRI_TEST_TIMEOUT" "20000" + Test.run tests tests :: Test.Test tests = From 579f64aa624f379261daf43d52904c078511528c Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Mon, 10 Feb 2025 11:35:11 -0300 Subject: [PATCH 3/3] try a wild timeout (see if timeouts are even working) --- nri-kafka/test/Main.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nri-kafka/test/Main.hs b/nri-kafka/test/Main.hs index eb064639..b4b1dde7 100644 --- a/nri-kafka/test/Main.hs +++ b/nri-kafka/test/Main.hs @@ -9,7 +9,7 @@ import qualified Prelude main :: Prelude.IO () main = do -- macos runners seem to be slow and fail on several kafka integration tests - System.Environment.setEnv "NRI_TEST_TIMEOUT" "20000" + System.Environment.setEnv "NRI_TEST_TIMEOUT" "240000" Test.run tests tests :: Test.Test