Skip to content

Commit

Permalink
[#254] remove StreamClientKinesis track transformed
Browse files Browse the repository at this point in the history
This needs to be handled during both bulk and streaming transformations.
That means Sources BODS is a better place for it.
  • Loading branch information
tiredpixel committed May 10, 2024
1 parent 8c720b4 commit fdfc16c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 18 deletions.
2 changes: 0 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ AllCops:
NewCops: enable
Metrics/AbcSize:
Enabled: false
Metrics/BlockLength:
Enabled: false
Metrics/ClassLength:
Enabled: false
Metrics/MethodLength:
Expand Down
19 changes: 3 additions & 16 deletions lib/register_common/services/stream_client_kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require 'logger'
require 'redis'

require_relative '../utils/expiring_set'
require_relative 'msg_handler'

module RegisterCommon
Expand All @@ -18,9 +17,6 @@ module Services
class SequenceError < StandardError; end

class StreamClientKinesis
REDIS_TRANSFORMED_KEY = 'transformed'
REDIS_TRANSFORMED_TTL = 172_800 # 48h

# rubocop:disable Metrics/ParameterLists
def initialize(
credentials:, stream_name:, msg_handler: nil, s3_adapter: nil, s3_bucket: nil, redis: nil,
Expand All @@ -40,10 +36,6 @@ def initialize(

# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def consume(consumer_id, limit: nil)
exp_set = Utils::ExpiringSet.new(
redis: @redis, namespace: consumer_id, ttl: REDIS_TRANSFORMED_TTL
)

shard_ids = list_shards

sequence_numbers = shard_ids.to_h do |shard_id|
Expand Down Expand Up @@ -77,14 +69,9 @@ def consume(consumer_id, limit: nil)
resp.records.each do |record|
record_h = JSON.parse(record.data, symbolize_names: true)
@logger.info "[#{shard_id}] [#{record.sequence_number}] #{record_h[:data][:links][:self]}"
if exp_set.sismember(REDIS_TRANSFORMED_KEY, record_h[:data][:etag])
@logger.debug "[#{shard_id}] SKIP: #{record_h[:data][:etag]}"
next
end

yield msg_handler.process(record.data)

exp_set.sadd(REDIS_TRANSFORMED_KEY, record_h[:data][:etag])
record_count += 1
last_record = record

Expand Down Expand Up @@ -140,20 +127,20 @@ def get_records(shard_iterator)
end

def get_sequence_number(consumer_id, shard_id)
key = redis_key_seq(consumer_id, shard_id)
key = redis_key(consumer_id, shard_id)
redis.get key
end

def store_sequence_number(consumer_id, shard_id, sequence_number)
key = redis_key_seq(consumer_id, shard_id)
key = redis_key(consumer_id, shard_id)
if sequence_number
redis.set(key, sequence_number)
else
redis.del(key)
end
end

def redis_key_seq(consumer_id, shard_id)
def redis_key(consumer_id, shard_id)
['kinesis', consumer_id, shard_id].join('_')
end
end
Expand Down

0 comments on commit fdfc16c

Please sign in to comment.