forked from haskell-works/hw-kafka-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumerExample.hs
64 lines (56 loc) · 2.37 KB
/
ConsumerExample.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module ConsumerExample
where
import Control.Arrow ((&&&))
import Control.Exception (bracket)
import Kafka.Consumer
import Data.Text (Text)
-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> setCallback (rebalanceCallback printingRebalanceCallback)
<> setCallback (offsetCommitCallback printingOffsetCallback)
<> logLevel KafkaLogInfo
-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest
-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
print $ cpLogLevel consumerProps
res <- bracket mkConsumer clConsumer runHandler
print res
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc
-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
mapM_ (\_ -> do
msg1 <- pollMessage kafka (Timeout 1000)
putStrLn $ "Message: " <> show msg1
err <- commitAllOffsets OffsetCommit kafka
putStrLn $ "Offsets: " <> maybe "Committed." show err
) [0 :: Integer .. 10]
return $ Right ()
printingRebalanceCallback :: KafkaConsumer -> RebalanceEvent -> IO ()
printingRebalanceCallback _ e = case e of
RebalanceBeforeAssign ps ->
putStrLn $ "[Rebalance] About to assign partitions: " <> show ps
RebalanceAssign ps ->
putStrLn $ "[Rebalance] Assign partitions: " <> show ps
RebalanceBeforeRevoke ps ->
putStrLn $ "[Rebalance] About to revoke partitions: " <> show ps
RebalanceRevoke ps ->
putStrLn $ "[Rebalance] Revoke partitions: " <> show ps
printingOffsetCallback :: KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()
printingOffsetCallback _ e ps = do
print ("Offsets callback:" ++ show e)
mapM_ (print . (tpTopicName &&& tpPartition &&& tpOffset)) ps