-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSample3.aplf
67 lines (56 loc) · 1.58 KB
/
Sample3.aplf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Sample3 n;i;start;cr
⍝ Topics created with
⍝
⍝ kafka-topics.sh \
⍝ --bootstrap-server localhost:9092 \
⍝ --create --topic "animals" \
⍝ --partitions 3
⍝
⍝ kafka-topics.sh \
⍝ --bootstrap-server localhost:9092 \
⍝ --create --topic "cars" \
⍝ --partitions 3
⍝
⍝ kafka-topics.sh \
⍝ --bootstrap-server localhost:9092 \
⍝ --create --topic "plants" \
⍝ --partitions 3
⍝ Call to Init function
Init 'path/to/dir/housing/kafka/shared/lib'
config←0 2⍴⍬
config⍪←'bootstrap.servers' 'localhost:9092'
config⍪←'client.id' 'bhcgrs3550'
config⍪←'group.id' 'dyalog'
consumer←⎕NEW Consumer config
consumer.subscribe'animals' 'cars' 'plants'
⎕DL 5
config←0 2⍴⍬
config⍪←'bootstrap.servers' 'localhost:9092'
config⍪←'client.id' 'bhc'
producer←⎕NEW Producer config
:For i :In ⍳n
producer.produce_record ⎕NEW #.Record('animals'(100↑'Payload',⍕i)('key',⍕4|i))
:If 0=10|i
producer.update_outstanding
:EndIf
:EndFor
producer.produce_record ⎕NEW #.Record('cars' 'ferrari' 'sportcars')
producer.produce_record ⎕NEW #.Record('plants' 'iris' 'flowers' 0)
producer.update_outstanding
start←3⊃⎕AI
:While 1
cr←consumer.consume_record
:If 1=⊃cr
:AndIf 20000>(3⊃⎕AI)-start
⎕DL 0.2
:Continue
:ElseIf 0=⊃cr
(2⊃cr).(Topic Payload Key Partition)
start←0
:Else
cr
:Leave
:EndIf
:EndWhile
⎕EX'producer'
⎕EX'consumer'