Skip to content

Commit

Permalink
Merge pull request #33 from openownership/254-tracking
Browse files Browse the repository at this point in the history
254 tracking
  • Loading branch information
tiredpixel authored May 10, 2024
2 parents a1ffdbb + fdfc16c commit 625564e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
13 changes: 5 additions & 8 deletions lib/register_common/services/stream_client_kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,9 @@ def consume(consumer_id, limit: nil)

last_record = nil
resp.records.each do |record|
tag = begin
JSON.parse(record.data, symbolize_names: true)[:data][:links][:self]
rescue JSON::ParserError
nil
end
@logger.info "[#{shard_id}] [#{record.sequence_number}] #{tag}"
record_h = JSON.parse(record.data, symbolize_names: true)
@logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}"

yield msg_handler.process(record.data)

record_count += 1
Expand All @@ -85,7 +82,7 @@ def consume(consumer_id, limit: nil)
end

iterators[shard_id] = resp.next_shard_iterator
store_sequence_number(consumer_id, shard_id, last_record.sequence_number)
store_sequence_number(consumer_id, shard_id, last_record.sequence_number) if last_record

break if complete
end
Expand Down Expand Up @@ -144,7 +141,7 @@ def store_sequence_number(consumer_id, shard_id, sequence_number)
end

def redis_key(consumer_id, shard_id)
"kinesis_#{consumer_id}_#{shard_id}"
['kinesis', consumer_id, shard_id].join('_')
end
end
end
Expand Down
39 changes: 39 additions & 0 deletions lib/register_common/utils/expiring_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

require 'redis'

module RegisterCommon
module Utils
class ExpiringSet
def initialize(redis: nil, namespace: nil, ttl: 60)
@redis = redis
@namespace = namespace
@ttl = ttl
end

def sadd(key, member)
t = Time.now.utc.to_i / @ttl
idx = _k_idx(key)
key1 = [idx, t].join('/')
@redis.sadd(idx, key1)
@redis.sadd(key1, member)
@redis.expire(key1, @ttl * 2, nx: true)
end

def sismember(key, member)
idx = _k_idx(key)
@redis.smembers(idx).each do |key1|
@redis.srem(idx, key1) unless @redis.exists?(key1)
return true if @redis.sismember(key1, member)
end
false
end

private

def _k_idx(key)
[@namespace, key].join('/')
end
end
end
end
16 changes: 12 additions & 4 deletions spec/unit/services/stream_client_kinesis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
shard_iterator = 'iterator1'
next_shard_iterator = 'next-iterator1'
shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) }
records = [double('record', data: 'data1', sequence_number: 'seq1')]
record_h = { data: { links: { self: 'data1' } } }
records = [double('record', data: record_h.to_json, sequence_number: 'seq1')]

expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return nil
allow(redis).to receive(:set)
allow(redis).to receive(:smembers).and_return Set.new
allow(redis).to receive(:sadd)
allow(redis).to receive(:expire)

expect(client).to receive(:list_shards).with(
{ stream_name: }
Expand All @@ -52,7 +56,7 @@
records = []
subject.consume(consumer_id, limit: 1) { |record| records << record }

expect(records).to eq ['data1']
expect(records).to eq [record_h.to_json]
expect(redis).to have_received(:set).with(
'kinesis_consumer_id_shard1', 'seq1'
)
Expand All @@ -65,10 +69,14 @@
shard_iterator = 'iterator1'
next_shard_iterator = 'next-iterator1'
shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) }
records = [double('record', data: 'data1', sequence_number: 'seq1')]
record_h = { data: { links: { self: 'data1' } } }
records = [double('record', data: record_h.to_json, sequence_number: 'seq1')]

expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return 'stored-seq'
allow(redis).to receive(:set)
allow(redis).to receive(:smembers).and_return Set.new
allow(redis).to receive(:sadd)
allow(redis).to receive(:expire)

expect(client).to receive(:list_shards).with(
{ stream_name: }
Expand All @@ -92,7 +100,7 @@
records = []
subject.consume(consumer_id, limit: 1) { |record| records << record }

expect(records).to eq ['data1']
expect(records).to eq [record_h.to_json]
expect(redis).to have_received(:set).with(
'kinesis_consumer_id_shard1', 'seq1'
)
Expand Down

0 comments on commit 625564e

Please sign in to comment.