Skip to content

Commit

Permalink
Merge pull request #129 from NoRedInk/expose-retry-counts
Browse files Browse the repository at this point in the history
nri-kafka: expose retry counts (for worker managing its own offsets)
  • Loading branch information
omnibs authored Feb 12, 2025
2 parents 413189e + 579f64a commit e32af41
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
1 change: 1 addition & 0 deletions nri-kafka/src/Kafka/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Kafka.Worker
Internal.receiveRawMessages,
Internal.PartitionOffset (..),
Partition.SeekCmd (..),
Partition.ProcessAttemptsCount (..),
Internal.CommitToKafkaAsWell (..),
)
where
Expand Down
11 changes: 6 additions & 5 deletions nri-kafka/src/Kafka/Worker/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ data PartitionOffset = PartitionOffset
-- | The partition's offset.
offset :: Int
}
deriving (Show)

-- | Create a subscription for a topic.
--
Expand All @@ -92,7 +93,7 @@ subscription topic callback =
{ topic = Kafka.Topic topic,
onMessage =
Partition.MessageCallback
( \_ msg -> do
( \_ _ msg -> do
callback msg
Task.succeed Partition.NoSeek
),
Expand All @@ -118,30 +119,30 @@ subscription topic callback =
-- > sql
-- > "SELECT partition, offset FROM offsets WHERE partition = %"
-- > [partitions] )
-- > (\msg -> Debug.todo "Process your message here!")
-- > (\retryCount msg -> Debug.todo "Process your message here!")
-- > process settings subscription
subscriptionManageOwnOffsets ::
(Aeson.FromJSON msg, Aeson.ToJSON msg) =>
Text ->
CommitToKafkaAsWell ->
([Int] -> Task Text (List PartitionOffset)) ->
(PartitionOffset -> msg -> Task Text Partition.SeekCmd) ->
(PartitionOffset -> Partition.ProcessAttemptsCount -> msg -> Task Text Partition.SeekCmd) ->
TopicSubscription
subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback =
TopicSubscription
{ topic = Kafka.Topic topic,
commitToKafkaAsWell,
onMessage =
Partition.MessageCallback
( \record msg -> do
( \record retryCount msg -> do
let offsetParams =
PartitionOffset
{ partitionId =
Consumer.crPartition record
|> partitionIdToInt,
offset = Consumer.unOffset (Consumer.crOffset record)
}
callback offsetParams msg
callback offsetParams retryCount msg
),
offsetSource =
Elsewhere
Expand Down
7 changes: 4 additions & 3 deletions nri-kafka/src/Kafka/Worker/Partition.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Kafka.Worker.Partition
SeekCmd (..),
CommitOffsets (..),
MessageFormat (..),
ProcessAttemptsCount (..),
-- just exported for tests
microSecondsDelayForAttempt,
OnStartup (OnStartup),
Expand Down Expand Up @@ -95,7 +96,7 @@ newtype OnCleanup = OnCleanup (Prelude.IO ())
data MessageCallback where
MessageCallback ::
(Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) =>
(Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) ->
(Consumer.ConsumerRecord () () -> ProcessAttemptsCount -> msg -> Task e SeekCmd) ->
MessageCallback

data CommitOffsets
Expand Down Expand Up @@ -255,7 +256,7 @@ processMsgLoop skipOrNot messageFormat commitOffsets observabilityHandler state
Platform.setTracingSpanDetailsIO log details
handleFailures log <| do
msg <- decodeMessage messageFormat record
runCallback record {Consumer.crKey = (), Consumer.crValue = ()} msg
runCallback record {Consumer.crKey = (), Consumer.crValue = ()} processAttempts msg
|> Task.mapError WorkerCallbackFailed
|> Task.onError
( \err -> do
Expand All @@ -282,7 +283,7 @@ microSecondsDelayForAttempt attempts =
2 Prelude.^ (min attempts 10) * 1_000_000

handleFailures ::
Show e =>
(Show e) =>
Platform.LogHandler ->
Task (WorkerError e) a ->
Prelude.IO ()
Expand Down
41 changes: 39 additions & 2 deletions nri-kafka/test/Helpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Helpers
stopWorker,
test,
sendSync,
spawnWorkerManagingOwnOffsets,
)
where

Expand Down Expand Up @@ -78,6 +79,42 @@ spawnWorker handler' topic callback =
Async.link async
Prelude.pure (Worker async)

spawnWorkerManagingOwnOffsets ::
(Aeson.ToJSON msg, Aeson.FromJSON msg) =>
TestHandler ->
Internal.Topic ->
(Worker.PartitionOffset -> Worker.ProcessAttemptsCount -> msg -> STM.STM Worker.SeekCmd) ->
Expect.Expectation' Worker
spawnWorkerManagingOwnOffsets handler' topic callback =
Expect.fromIO <| do
settings <-
case Environment.decodeDefaults Worker.Settings.decoder of
Ok settings' -> Prelude.pure settings'
Err err -> Prelude.fail (Text.toList err)
async <-
Kafka.Worker.Internal.processWithoutShutdownEnsurance
settings
(Consumer.ConsumerGroupId "group")
( Worker.subscriptionManageOwnOffsets
(Internal.unTopic topic)
Worker.CommitToKafkaAsWell
( \partitions ->
partitions
|> List.map (\id -> Worker.PartitionOffset {Worker.partitionId = id, Worker.offset = 0})
|> Task.succeed
)
( \partitionOffset retryCount msg -> do
callback partitionOffset retryCount msg
|> STM.atomically
|> map Ok
|> Platform.doAnything (doAnything handler')
)
)
|> Async.race_ (returnWhenTerminating handler')
|> Async.async
Async.link async
Prelude.pure (Worker async)

-- | Stops a single worker
stopWorker :: Worker -> Expect.Expectation
stopWorker (Worker async) =
Expand Down Expand Up @@ -119,7 +156,7 @@ testHandler Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeou
Prelude.pure TestHandler {producer, doAnything, terminator}

-- | puts a message synchronously onto a topic-partition
sendSync :: Aeson.ToJSON a => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation
sendSync :: (Aeson.ToJSON a) => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation
sendSync handler topicName partitionId msg' =
Platform.tracingSpan
"Sync send Kafka messages"
Expand Down Expand Up @@ -175,7 +212,7 @@ record topicName partitionId val =

-- | test helper, that yields a new @Kafka.Topic@ and @TestHandler@
test ::
Stack.HasCallStack =>
(Stack.HasCallStack) =>
Text ->
((Internal.Topic, TestHandler) -> Expect.Expectation) ->
Test.Test
Expand Down
6 changes: 5 additions & 1 deletion nri-kafka/test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ module Main (main) where

import qualified Spec.Kafka.Worker.Integration
import qualified Spec.Kafka.Worker.Partition
import qualified System.Environment
import qualified Test
import qualified Prelude

main :: Prelude.IO ()
main = Test.run tests
main = do
-- macos runners seem to be slow and fail on several kafka integration tests
System.Environment.setEnv "NRI_TEST_TIMEOUT" "240000"
Test.run tests

tests :: Test.Test
tests =
Expand Down
21 changes: 20 additions & 1 deletion nri-kafka/test/Spec/Kafka/Worker/Integration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import qualified Control.Concurrent.STM as STM
import qualified Dict
import qualified Expect
import qualified Helpers
import qualified Kafka.Worker as Worker
import qualified Set
import qualified Test
import qualified Prelude
Expand Down Expand Up @@ -55,7 +56,25 @@ tests =
[ (1, [2, 1]),
(2, [2, 1])
]
),
Helpers.test "Self-managing workers get retry count info" <| \(topic, handler) -> do
Helpers.sendSync handler topic 1 1
msgsTVar <- atomically (STM.newTVar Set.empty)
_ <-
Helpers.spawnWorkerManagingOwnOffsets
handler
topic
( \partitionOffset (Worker.ProcessAttemptsCount retryCount) msg -> do
STM.modifyTVar' msgsTVar (Set.insert (msg, retryCount))
if retryCount < 1
then STM.throwSTM (Prelude.userError "retry please")
else
Prelude.pure
<| Worker.SeekToOffset ((Worker.offset partitionOffset) + 1)
)
msgs' <- waitFor msgsTVar (\items -> Set.size items == 1)
-- Assert that the message was recorded on its first retry
msgs' |> Expect.equal (Set.fromList [(1, 1)])
]
]

Expand All @@ -70,7 +89,7 @@ waitFor tVar pred =
then Prelude.pure val
else STM.retry

groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap :: (Ord b) => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap f =
List.foldr
( \x ->
Expand Down

0 comments on commit e32af41

Please sign in to comment.