Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nri-kafka: expose retry counts (for worker managing its own offsets) #129

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nri-kafka/src/Kafka/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Kafka.Worker
Internal.receiveRawMessages,
Internal.PartitionOffset (..),
Partition.SeekCmd (..),
Partition.ProcessAttemptsCount (..),
Internal.CommitToKafkaAsWell (..),
)
where
Expand Down
11 changes: 6 additions & 5 deletions nri-kafka/src/Kafka/Worker/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ data PartitionOffset = PartitionOffset
-- | The partition's offset.
offset :: Int
}
deriving (Show)

-- | Create a subscription for a topic.
--
Expand All @@ -92,7 +93,7 @@ subscription topic callback =
{ topic = Kafka.Topic topic,
onMessage =
Partition.MessageCallback
( \_ msg -> do
( \_ _ msg -> do
callback msg
Task.succeed Partition.NoSeek
),
Expand All @@ -118,30 +119,30 @@ subscription topic callback =
-- > sql
-- > "SELECT partition, offset FROM offsets WHERE partition = %"
-- > [partitions] )
-- > (\msg -> Debug.todo "Process your message here!")
-- > (\retryCount msg -> Debug.todo "Process your message here!")
-- > process settings subscription
subscriptionManageOwnOffsets ::
(Aeson.FromJSON msg, Aeson.ToJSON msg) =>
Text ->
CommitToKafkaAsWell ->
([Int] -> Task Text (List PartitionOffset)) ->
(PartitionOffset -> msg -> Task Text Partition.SeekCmd) ->
(PartitionOffset -> Partition.ProcessAttemptsCount -> msg -> Task Text Partition.SeekCmd) ->
TopicSubscription
subscriptionManageOwnOffsets topic commitToKafkaAsWell fetchOffsets callback =
TopicSubscription
{ topic = Kafka.Topic topic,
commitToKafkaAsWell,
onMessage =
Partition.MessageCallback
( \record msg -> do
( \record retryCount msg -> do
let offsetParams =
PartitionOffset
{ partitionId =
Consumer.crPartition record
|> partitionIdToInt,
offset = Consumer.unOffset (Consumer.crOffset record)
}
callback offsetParams msg
callback offsetParams retryCount msg
),
offsetSource =
Elsewhere
Expand Down
7 changes: 4 additions & 3 deletions nri-kafka/src/Kafka/Worker/Partition.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Kafka.Worker.Partition
SeekCmd (..),
CommitOffsets (..),
MessageFormat (..),
ProcessAttemptsCount (..),
-- just exported for tests
microSecondsDelayForAttempt,
OnStartup (OnStartup),
Expand Down Expand Up @@ -95,7 +96,7 @@ newtype OnCleanup = OnCleanup (Prelude.IO ())
data MessageCallback where
MessageCallback ::
(Show e, Aeson.ToJSON msg, Aeson.FromJSON msg) =>
(Consumer.ConsumerRecord () () -> msg -> Task e SeekCmd) ->
(Consumer.ConsumerRecord () () -> ProcessAttemptsCount -> msg -> Task e SeekCmd) ->
MessageCallback

data CommitOffsets
Expand Down Expand Up @@ -255,7 +256,7 @@ processMsgLoop skipOrNot messageFormat commitOffsets observabilityHandler state
Platform.setTracingSpanDetailsIO log details
handleFailures log <| do
msg <- decodeMessage messageFormat record
runCallback record {Consumer.crKey = (), Consumer.crValue = ()} msg
runCallback record {Consumer.crKey = (), Consumer.crValue = ()} processAttempts msg
|> Task.mapError WorkerCallbackFailed
|> Task.onError
( \err -> do
Expand All @@ -282,7 +283,7 @@ microSecondsDelayForAttempt attempts =
2 Prelude.^ (min attempts 10) * 1_000_000

handleFailures ::
Show e =>
(Show e) =>
Platform.LogHandler ->
Task (WorkerError e) a ->
Prelude.IO ()
Expand Down
41 changes: 39 additions & 2 deletions nri-kafka/test/Helpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Helpers
stopWorker,
test,
sendSync,
spawnWorkerManagingOwnOffsets,
)
where

Expand Down Expand Up @@ -78,6 +79,42 @@ spawnWorker handler' topic callback =
Async.link async
Prelude.pure (Worker async)

spawnWorkerManagingOwnOffsets ::
(Aeson.ToJSON msg, Aeson.FromJSON msg) =>
TestHandler ->
Internal.Topic ->
(Worker.PartitionOffset -> Worker.ProcessAttemptsCount -> msg -> STM.STM Worker.SeekCmd) ->
Expect.Expectation' Worker
spawnWorkerManagingOwnOffsets handler' topic callback =
Expect.fromIO <| do
settings <-
case Environment.decodeDefaults Worker.Settings.decoder of
Ok settings' -> Prelude.pure settings'
Err err -> Prelude.fail (Text.toList err)
async <-
Kafka.Worker.Internal.processWithoutShutdownEnsurance
settings
(Consumer.ConsumerGroupId "group")
( Worker.subscriptionManageOwnOffsets
(Internal.unTopic topic)
Worker.CommitToKafkaAsWell
( \partitions ->
partitions
|> List.map (\id -> Worker.PartitionOffset {Worker.partitionId = id, Worker.offset = 0})
|> Task.succeed
)
( \partitionOffset retryCount msg -> do
callback partitionOffset retryCount msg
|> STM.atomically
|> map Ok
|> Platform.doAnything (doAnything handler')
)
)
|> Async.race_ (returnWhenTerminating handler')
|> Async.async
Async.link async
Prelude.pure (Worker async)

-- | Stops a single worker
stopWorker :: Worker -> Expect.Expectation
stopWorker (Worker async) =
Expand Down Expand Up @@ -119,7 +156,7 @@ testHandler Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeou
Prelude.pure TestHandler {producer, doAnything, terminator}

-- | puts a message synchronously onto a topic-partition
sendSync :: Aeson.ToJSON a => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation
sendSync :: (Aeson.ToJSON a) => TestHandler -> Internal.Topic -> Int -> a -> Expect.Expectation
sendSync handler topicName partitionId msg' =
Platform.tracingSpan
"Sync send Kafka messages"
Expand Down Expand Up @@ -175,7 +212,7 @@ record topicName partitionId val =

-- | test helper, that yields a new @Kafka.Topic@ and @TestHandler@
test ::
Stack.HasCallStack =>
(Stack.HasCallStack) =>
Text ->
((Internal.Topic, TestHandler) -> Expect.Expectation) ->
Test.Test
Expand Down
6 changes: 5 additions & 1 deletion nri-kafka/test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ module Main (main) where

import qualified Spec.Kafka.Worker.Integration
import qualified Spec.Kafka.Worker.Partition
import qualified System.Environment
import qualified Test
import qualified Prelude

main :: Prelude.IO ()
main = Test.run tests
main = do
-- macos runners seem to be slow and fail on several kafka integration tests
System.Environment.setEnv "NRI_TEST_TIMEOUT" "240000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow this is a lot.
I thought it was just a temporary thing while you were debugging but I see it's still there.
I guess if we need it, we need it!
How long do the tests actually take?

Test.run tests

tests :: Test.Test
tests =
Expand Down
21 changes: 20 additions & 1 deletion nri-kafka/test/Spec/Kafka/Worker/Integration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import qualified Control.Concurrent.STM as STM
import qualified Dict
import qualified Expect
import qualified Helpers
import qualified Kafka.Worker as Worker
import qualified Set
import qualified Test
import qualified Prelude
Expand Down Expand Up @@ -55,7 +56,25 @@ tests =
[ (1, [2, 1]),
(2, [2, 1])
]
),
Helpers.test "Self-managing workers get retry count info" <| \(topic, handler) -> do
Helpers.sendSync handler topic 1 1
msgsTVar <- atomically (STM.newTVar Set.empty)
_ <-
Helpers.spawnWorkerManagingOwnOffsets
handler
topic
( \partitionOffset (Worker.ProcessAttemptsCount retryCount) msg -> do
STM.modifyTVar' msgsTVar (Set.insert (msg, retryCount))
if retryCount < 1
then STM.throwSTM (Prelude.userError "retry please")
else
Prelude.pure
<| Worker.SeekToOffset ((Worker.offset partitionOffset) + 1)
)
msgs' <- waitFor msgsTVar (\items -> Set.size items == 1)
-- Assert that the message was recorded on its first retry
msgs' |> Expect.equal (Set.fromList [(1, 1)])
]
]

Expand All @@ -70,7 +89,7 @@ waitFor tVar pred =
then Prelude.pure val
else STM.retry

groupDictAndMap :: Ord b => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap :: (Ord b) => (a -> (b, c)) -> List a -> Dict.Dict b (List c)
groupDictAndMap f =
List.foldr
( \x ->
Expand Down