-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathstreamer.rb
91 lines (75 loc) · 2.54 KB
/
streamer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
require "json"
require_relative "./api"
class Streamer
def run
loop do
# simulate double-send some amount of the time
num_streamed = run_once(send_twice: rand() < 0.10)
# Sleep for a while if we didn't find anything to stream on the last
# run.
if num_streamed == 0
$stdout.puts "Sleeping for #{SLEEP_DURATION}"
sleep(SLEEP_DURATION)
end
end
end
# Runs the process one time: all staged records are iterated through, pushed
# into the stream, and then removed.
#
# The special `send_twice` parameter is there to simulate failure in the
# system. In some cases this loop might fail midway through so that some
# records were added to Redis, and some weren't. The process will retry and
# probably succeed that second time through, but even so the stream will now
# contain duplicated records. `send_twice` adds all staged records into the
# stream twice to simulate this type of event.
def run_once(send_twice: false)
num_streamed = 0
# Need at least repeatable read isolation level so that our DELETE after
# enqueueing will see the same records as the original SELECT.
DB.transaction(isolation_level: :repeatable_read) do
records = StagedLogRecord.order(:id).limit(BATCH_SIZE)
unless records.empty?
RDB.multi do
records.each do |record|
stream(record.data)
num_streamed += 1
# simulate a double-send by adding the same record again
if send_twice
stream(record.data)
num_streamed += 1
end
$stdout.puts "Enqueued record: #{record.action} #{record.object}"
end
end
StagedLogRecord.where(Sequel.lit("id <= ?", records.last.id)).delete
end
end
num_streamed
end
#
# private
#
# Number of records to try to stream on each batch.
BATCH_SIZE = 1000
private_constant :BATCH_SIZE
# Sleep duration in seconds to sleep in case we ran but didn't find anything
# to stream.
SLEEP_DURATION = 5
private_constant :SLEEP_DURATION
private def stream(data)
# XADD mystream MAXLEN ~ 10000 * data <JSON-encoded blob>
#
# MAXLEN ~ 10000 caps the stream at roughly that number (the "~" trades
# precision for speed) so that it doesn't grow in a purely unbounded way.
RDB.xadd(STREAM_NAME, "MAXLEN", "~", STREAM_MAXLEN, "*", "data", JSON.generate(data))
end
end
#
# run
#
if __FILE__ == $0
# so output appears in Forego
$stderr.sync = true
$stdout.sync = true
Streamer.new.run
end