From 6223d7c36a33c4b784d9dd28320fb5de48e048b5 Mon Sep 17 00:00:00 2001 From: Nicolas Williams Date: Fri, 10 May 2024 11:59:56 +0000 Subject: [PATCH 1/4] [#254] refactor StreamClientKinesis consume tag into record_h --- .rubocop.yml | 2 ++ lib/register_common/services/stream_client_kinesis.rb | 8 ++------ spec/unit/services/stream_client_kinesis_spec.rb | 10 ++++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index 27b5c8c..c07fc55 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,6 +7,8 @@ AllCops: NewCops: enable Metrics/AbcSize: Enabled: false +Metrics/BlockLength: + Enabled: false Metrics/ClassLength: Enabled: false Metrics/MethodLength: diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index 5ee4df2..4b8fd83 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -67,12 +67,8 @@ def consume(consumer_id, limit: nil) last_record = nil resp.records.each do |record| - tag = begin - JSON.parse(record.data, symbolize_names: true)[:data][:links][:self] - rescue JSON::ParserError - nil - end - @logger.info "[#{shard_id}] [#{record.sequence_number}] #{tag}" + record_h = JSON.parse(record.data, symbolize_names: true) + @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" yield msg_handler.process(record.data) record_count += 1 diff --git a/spec/unit/services/stream_client_kinesis_spec.rb b/spec/unit/services/stream_client_kinesis_spec.rb index 956eef2..e394255 100644 --- a/spec/unit/services/stream_client_kinesis_spec.rb +++ b/spec/unit/services/stream_client_kinesis_spec.rb @@ -25,7 +25,8 @@ shard_iterator = 'iterator1' next_shard_iterator = 'next-iterator1' shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) } - records = [double('record', data: 'data1', sequence_number: 'seq1')] + record_h = { data: { links: { self: 'data1' } } } + records = [double('record', data: record_h.to_json, sequence_number: 'seq1')] expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return nil allow(redis).to receive(:set) @@ -52,7 +53,7 @@ records = [] subject.consume(consumer_id, limit: 1) { |record| records << record } - expect(records).to eq ['data1'] + expect(records).to eq [record_h.to_json] expect(redis).to have_received(:set).with( 'kinesis_consumer_id_shard1', 'seq1' ) @@ -65,7 +66,8 @@ shard_iterator = 'iterator1' next_shard_iterator = 'next-iterator1' shards = double 'shards', shards: shard_ids.map { |shard_id| double('shard_id', shard_id:) } - records = [double('record', data: 'data1', sequence_number: 'seq1')] + record_h = { data: { links: { self: 'data1' } } } + records = [double('record', data: record_h.to_json, sequence_number: 'seq1')] expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return 'stored-seq' allow(redis).to receive(:set) @@ -92,7 +94,7 @@ records = [] subject.consume(consumer_id, limit: 1) { |record| records << record } - expect(records).to eq ['data1'] + expect(records).to eq [record_h.to_json] expect(redis).to have_received(:set).with( 'kinesis_consumer_id_shard1', 'seq1' ) From 4ff33f25da9561ad8b6b264981ef0e5ca1b22d09 Mon Sep 17 00:00:00 2001 From: Nicolas Williams Date: Fri, 10 May 2024 13:45:24 +0000 Subject: [PATCH 2/4] [#254] extend StreamClientKinesis to track transformed in Redis --- .../services/stream_client_kinesis.rb | 40 ++++++++++++++++--- .../services/stream_client_kinesis_spec.rb | 6 +++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index 4b8fd83..aeed88e 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -17,6 +17,8 @@ module Services class SequenceError < StandardError; end class StreamClientKinesis + RECORD_TRANSFORMED_TTL = 172_800 # 48h + # rubocop:disable Metrics/ParameterLists def initialize( credentials:, stream_name:, msg_handler: nil, s3_adapter: nil, s3_bucket: nil, redis: nil, @@ -69,8 +71,14 @@ def consume(consumer_id, limit: nil) resp.records.each do |record| record_h = JSON.parse(record.data, symbolize_names: true) @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" + if record_transformed?(consumer_id, record_h[:data][:etag]) + @logger.debug "[#{shard_id}] SKIP: #{record_h[:data][:etag]}" + next + end + yield msg_handler.process(record.data) + record_transformed(consumer_id, record_h[:data][:etag]) record_count += 1 last_record = record @@ -81,7 +89,7 @@ def consume(consumer_id, limit: nil) end iterators[shard_id] = resp.next_shard_iterator - store_sequence_number(consumer_id, shard_id, last_record.sequence_number) + store_sequence_number(consumer_id, shard_id, last_record.sequence_number) if last_record break if complete end @@ -126,12 +134,12 @@ def get_records(shard_iterator) end def get_sequence_number(consumer_id, shard_id) - key = redis_key(consumer_id, shard_id) + key = redis_key_seq(consumer_id, shard_id) redis.get key end def store_sequence_number(consumer_id, shard_id, sequence_number) - key = redis_key(consumer_id, shard_id) + key = redis_key_seq(consumer_id, shard_id) if sequence_number redis.set(key, sequence_number) else @@ -139,8 +147,30 @@ def store_sequence_number(consumer_id, shard_id, sequence_number) end end - def redis_key(consumer_id, shard_id) - "kinesis_#{consumer_id}_#{shard_id}" + def redis_key_seq(consumer_id, shard_id) + ['kinesis', consumer_id, shard_id].join('_') + end + + def redis_key_transformed(consumer_id) + ['kinesis', consumer_id, 'transformed'].join('_') + end + + def record_transformed?(consumer_id, etag) + k0 = redis_key_transformed(consumer_id) + redis.smembers(k0).each do |k1| + redis.srem(k0, k1) unless redis.exists?(k1) + return true if redis.sismember(k1, etag) + end + false + end + + def record_transformed(consumer_id, etag) + k0 = redis_key_transformed(consumer_id) + t = Time.now.utc.to_i / RECORD_TRANSFORMED_TTL + k1 = [k0, t].join('_') + redis.sadd(k0, k1) + redis.sadd(k1, etag) + redis.expire(k1, RECORD_TRANSFORMED_TTL * 2, nx: true) end end end diff --git a/spec/unit/services/stream_client_kinesis_spec.rb b/spec/unit/services/stream_client_kinesis_spec.rb index e394255..585e63d 100644 --- a/spec/unit/services/stream_client_kinesis_spec.rb +++ b/spec/unit/services/stream_client_kinesis_spec.rb @@ -30,6 +30,9 @@ expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return nil allow(redis).to receive(:set) + allow(redis).to receive(:smembers).and_return Set.new + allow(redis).to receive(:sadd) + allow(redis).to receive(:expire) expect(client).to receive(:list_shards).with( { stream_name: } @@ -71,6 +74,9 @@ expect(redis).to receive(:get).with('kinesis_consumer_id_shard1').and_return 'stored-seq' allow(redis).to receive(:set) + allow(redis).to receive(:smembers).and_return Set.new + allow(redis).to receive(:sadd) + allow(redis).to receive(:expire) expect(client).to receive(:list_shards).with( { stream_name: } From 8c720b4a99628aac2f9bccac50ac0e6306f25834 Mon Sep 17 00:00:00 2001 From: Nicolas Williams Date: Fri, 10 May 2024 15:10:11 +0000 Subject: [PATCH 3/4] [#254] extract into Utils::ExpiringSet --- .../services/stream_client_kinesis.rb | 34 +++++----------- lib/register_common/utils/expiring_set.rb | 39 +++++++++++++++++++ 2 files changed, 48 insertions(+), 25 deletions(-) create mode 100644 lib/register_common/utils/expiring_set.rb diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index aeed88e..077b008 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -5,6 +5,7 @@ require 'logger' require 'redis' +require_relative '../utils/expiring_set' require_relative 'msg_handler' module RegisterCommon @@ -17,7 +18,8 @@ module Services class SequenceError < StandardError; end class StreamClientKinesis - RECORD_TRANSFORMED_TTL = 172_800 # 48h + REDIS_TRANSFORMED_KEY = 'transformed' + REDIS_TRANSFORMED_TTL = 172_800 # 48h # rubocop:disable Metrics/ParameterLists def initialize( @@ -38,6 +40,10 @@ def initialize( # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity def consume(consumer_id, limit: nil) + exp_set = Utils::ExpiringSet.new( + redis: @redis, namespace: consumer_id, ttl: REDIS_TRANSFORMED_TTL + ) + shard_ids = list_shards sequence_numbers = shard_ids.to_h do |shard_id| @@ -71,14 +77,14 @@ def consume(consumer_id, limit: nil) resp.records.each do |record| record_h = JSON.parse(record.data, symbolize_names: true) @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" - if record_transformed?(consumer_id, record_h[:data][:etag]) + if exp_set.sismember(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) @logger.debug "[#{shard_id}] SKIP: #{record_h[:data][:etag]}" next end yield msg_handler.process(record.data) - record_transformed(consumer_id, record_h[:data][:etag]) + exp_set.sadd(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) record_count += 1 last_record = record @@ -150,28 +156,6 @@ def store_sequence_number(consumer_id, shard_id, sequence_number) def redis_key_seq(consumer_id, shard_id) ['kinesis', consumer_id, shard_id].join('_') end - - def redis_key_transformed(consumer_id) - ['kinesis', consumer_id, 'transformed'].join('_') - end - - def record_transformed?(consumer_id, etag) - k0 = redis_key_transformed(consumer_id) - redis.smembers(k0).each do |k1| - redis.srem(k0, k1) unless redis.exists?(k1) - return true if redis.sismember(k1, etag) - end - false - end - - def record_transformed(consumer_id, etag) - k0 = redis_key_transformed(consumer_id) - t = Time.now.utc.to_i / RECORD_TRANSFORMED_TTL - k1 = [k0, t].join('_') - redis.sadd(k0, k1) - redis.sadd(k1, etag) - redis.expire(k1, RECORD_TRANSFORMED_TTL * 2, nx: true) - end end end end diff --git a/lib/register_common/utils/expiring_set.rb b/lib/register_common/utils/expiring_set.rb new file mode 100644 index 0000000..a4b74ef --- /dev/null +++ b/lib/register_common/utils/expiring_set.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require 'redis' + +module RegisterCommon + module Utils + class ExpiringSet + def initialize(redis: nil, namespace: nil, ttl: 60) + @redis = redis + @namespace = namespace + @ttl = ttl + end + + def sadd(key, member) + t = Time.now.utc.to_i / @ttl + idx = _k_idx(key) + key1 = [idx, t].join('/') + @redis.sadd(idx, key1) + @redis.sadd(key1, member) + @redis.expire(key1, @ttl * 2, nx: true) + end + + def sismember(key, member) + idx = _k_idx(key) + @redis.smembers(idx).each do |key1| + @redis.srem(idx, key1) unless @redis.exists?(key1) + return true if @redis.sismember(key1, member) + end + false + end + + private + + def _k_idx(key) + [@namespace, key].join('/') + end + end + end +end From fdfc16c58f58ebca4c2a00916b77631d6852c98c Mon Sep 17 00:00:00 2001 From: Nicolas Williams Date: Fri, 10 May 2024 15:13:16 +0000 Subject: [PATCH 4/4] [#254] remove StreamClientKinesis track transformed This needs to be handled during both bulk and streaming transformations. That means Sources BODS is a better place for it. --- .rubocop.yml | 2 -- .../services/stream_client_kinesis.rb | 19 +++---------------- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index c07fc55..27b5c8c 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,8 +7,6 @@ AllCops: NewCops: enable Metrics/AbcSize: Enabled: false -Metrics/BlockLength: - Enabled: false Metrics/ClassLength: Enabled: false Metrics/MethodLength: diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index 077b008..5e877fe 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -5,7 +5,6 @@ require 'logger' require 'redis' -require_relative '../utils/expiring_set' require_relative 'msg_handler' module RegisterCommon @@ -18,9 +17,6 @@ module Services class SequenceError < StandardError; end class StreamClientKinesis - REDIS_TRANSFORMED_KEY = 'transformed' - REDIS_TRANSFORMED_TTL = 172_800 # 48h - # rubocop:disable Metrics/ParameterLists def initialize( credentials:, stream_name:, msg_handler: nil, s3_adapter: nil, s3_bucket: nil, redis: nil, @@ -40,10 +36,6 @@ def initialize( # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity def consume(consumer_id, limit: nil) - exp_set = Utils::ExpiringSet.new( - redis: @redis, namespace: consumer_id, ttl: REDIS_TRANSFORMED_TTL - ) - shard_ids = list_shards sequence_numbers = shard_ids.to_h do |shard_id| @@ -77,14 +69,9 @@ def consume(consumer_id, limit: nil) resp.records.each do |record| record_h = JSON.parse(record.data, symbolize_names: true) @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" - if exp_set.sismember(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) - @logger.debug "[#{shard_id}] SKIP: #{record_h[:data][:etag]}" - next - end yield msg_handler.process(record.data) - exp_set.sadd(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) record_count += 1 last_record = record @@ -140,12 +127,12 @@ def get_records(shard_iterator) end def get_sequence_number(consumer_id, shard_id) - key = redis_key_seq(consumer_id, shard_id) + key = redis_key(consumer_id, shard_id) redis.get key end def store_sequence_number(consumer_id, shard_id, sequence_number) - key = redis_key_seq(consumer_id, shard_id) + key = redis_key(consumer_id, shard_id) if sequence_number redis.set(key, sequence_number) else @@ -153,7 +140,7 @@ def store_sequence_number(consumer_id, shard_id, sequence_number) end end - def redis_key_seq(consumer_id, shard_id) + def redis_key(consumer_id, shard_id) ['kinesis', consumer_id, shard_id].join('_') end end