diff --git a/.rubocop.yml b/.rubocop.yml index c07fc55..27b5c8c 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,8 +7,6 @@ AllCops: NewCops: enable Metrics/AbcSize: Enabled: false -Metrics/BlockLength: - Enabled: false Metrics/ClassLength: Enabled: false Metrics/MethodLength: diff --git a/lib/register_common/services/stream_client_kinesis.rb b/lib/register_common/services/stream_client_kinesis.rb index 077b008..5e877fe 100644 --- a/lib/register_common/services/stream_client_kinesis.rb +++ b/lib/register_common/services/stream_client_kinesis.rb @@ -5,7 +5,6 @@ require 'logger' require 'redis' -require_relative '../utils/expiring_set' require_relative 'msg_handler' module RegisterCommon @@ -18,9 +17,6 @@ module Services class SequenceError < StandardError; end class StreamClientKinesis - REDIS_TRANSFORMED_KEY = 'transformed' - REDIS_TRANSFORMED_TTL = 172_800 # 48h - # rubocop:disable Metrics/ParameterLists def initialize( credentials:, stream_name:, msg_handler: nil, s3_adapter: nil, s3_bucket: nil, redis: nil, @@ -40,10 +36,6 @@ def initialize( # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity def consume(consumer_id, limit: nil) - exp_set = Utils::ExpiringSet.new( - redis: @redis, namespace: consumer_id, ttl: REDIS_TRANSFORMED_TTL - ) - shard_ids = list_shards sequence_numbers = shard_ids.to_h do |shard_id| @@ -77,14 +69,9 @@ def consume(consumer_id, limit: nil) resp.records.each do |record| record_h = JSON.parse(record.data, symbolize_names: true) @logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}" - if exp_set.sismember(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) - @logger.debug "[#{shard_id}] SKIP: #{record_h[:data][:etag]}" - next - end yield msg_handler.process(record.data) - exp_set.sadd(REDIS_TRANSFORMED_KEY, record_h[:data][:etag]) record_count += 1 last_record = record @@ -140,12 +127,12 @@ def get_records(shard_iterator) end def get_sequence_number(consumer_id, shard_id) - key = redis_key_seq(consumer_id, shard_id) + key = redis_key(consumer_id, shard_id) redis.get key end def store_sequence_number(consumer_id, shard_id, sequence_number) - key = redis_key_seq(consumer_id, shard_id) + key = redis_key(consumer_id, shard_id) if sequence_number redis.set(key, sequence_number) else @@ -153,7 +140,7 @@ def store_sequence_number(consumer_id, shard_id, sequence_number) end end - def redis_key_seq(consumer_id, shard_id) + def redis_key(consumer_id, shard_id) ['kinesis', consumer_id, shard_id].join('_') end end