diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index 5ee4df2..5e877fe 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -67,12 +67,9 @@ def consume(consumer_id, limit: nil) last_record = nil resp.records.each do |record| - tag = begin - JSON.parse(record.data, symbolize_names: true)[:data][:links][:self] - rescue JSON::ParserError - nil - end - @logger.info "[#{shard_id}] [#{record.sequence_number}] #{tag}" + record_h = JSON.parse(record.data, symbolize_names: true) + @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" + yield msg_handler.process(record.data) record_count += 1 @@ -85,7 +82,7 @@ def consume(consumer_id, limit: nil) end iterators[shard_id] = resp.next_shard_iterator - store_sequence_number(consumer_id, shard_id, last_record.sequence_number) + store_sequence_number(consumer_id, shard_id, last_record.sequence_number) if last_record break if complete end @@ -144,7 +141,7 @@ def store_sequence_number(consumer_id, shard_id, sequence_number) end def redis_key(consumer_id, shard_id) - "kinesis_#{consumer_id}_#{shard_id}" + ['kinesis', consumer_id, shard_id].join('_') end end end diff --git a/lib/register_common/utils/expiring_set.rb b/lib/register_common/utils/expiring_set.rb new file mode 100644 index 0000000..a4b74ef --- /dev/null +++ b/lib/register_common/utils/expiring_set.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require 'redis' + +module RegisterCommon + module Utils + class ExpiringSet + def initialize(redis: nil, namespace: nil, ttl: 60) + @redis = redis + @namespace = namespace + @ttl = ttl + end + + def sadd(key, member) + t = Time.now.utc.to_i / @ttl + idx = _k_idx(key) + key1 = [idx, t].join('/') + @redis.sadd(idx, key1) + @redis.sadd(key1, member) + @redis.expire(key1, @ttl * 2, nx: true) + end + + def sismember(key, member) + idx = _k_idx(key) + @redis.smembers(idx).each do |key1| + @redis.srem(idx, key1) unless @redis.exists?(key1) + return true if @redis.sismember(key1, member) + end + false + end + + private + + def _k_idx(key) + [@namespace, key].join('/') + end + end + end +end diff --git a/spec/unit/services/stream_client_kinesis_spec.rb b/spec/unit/services/stream_client_kinesis_spec.rb index 956eef2..585e63d 100644 --- a/spec/unit/services/stream_client_kinesis_spec.rb +++ b/spec/unit/services/stream_client_kinesis_spec.rb @@ -25,10 +25,14 @@ shard_iterator = 'iterator1' next_shard_iterator = 'next-iterator1' shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) } - records = [double('record', data: 'data1', sequence_number: 'seq1')] + record_h = { data: { links: { self: 'data1' } } } + records = [double('record', data: record_h.to_json, sequence_number: 'seq1')] expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return nil allow(redis).to receive(:set) + allow(redis).to receive(:smembers).and_return Set.new + allow(redis).to receive(:sadd) + allow(redis).to receive(:expire) expect(client).to receive(:list_shards).with( { stream_name: } @@ -52,7 +56,7 @@ records = [] subject.consume(consumer_id, limit: 1) { |record| records << record } - expect(records).to eq ['data1'] + expect(records).to eq [record_h.to_json] expect(redis).to have_received(:set).with( 'kinesis_consumer_id_shard1', 'seq1' ) @@ -65,10 +69,14 @@ shard_iterator = 'iterator1' next_shard_iterator = 'next-iterator1' shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) } - records = [double('record', data: 'data1', sequence_number: 'seq1')] + record_h = { data: { links: { self: 'data1' } } } + records = [double('record', data: record_h.to_json, sequence_number: 'seq1')] expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return 'stored-seq' allow(redis).to receive(:set) + allow(redis).to receive(:smembers).and_return Set.new + allow(redis).to receive(:sadd) + allow(redis).to receive(:expire) expect(client).to receive(:list_shards).with( { stream_name: } @@ -92,7 +100,7 @@ records = [] subject.consume(consumer_id, limit: 1) { |record| records << record } - expect(records).to eq ['data1'] + expect(records).to eq [record_h.to_json] expect(redis).to have_received(:set).with( 'kinesis_consumer_id_shard1', 'seq1' )