From 25afde5cf52bf1908f824f5f139d5b578674eaf0 Mon Sep 17 00:00:00 2001 From: Nicolas Williams Date: Thu, 27 Jun 2024 18:16:15 +0000 Subject: [PATCH] [#270] rewrite Adapters::HttpAdapter to fix missed events in streams As per: https://github.com/openownership/register/issues/270#issuecomment-2195382044 Also simplify the code significantly. --- lib/register_common/adapters/http_adapter.rb | 67 ++++++++------------ 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/lib/register_common/adapters/http_adapter.rb b/lib/register_common/adapters/http_adapter.rb index 848bf96..80ea973 100644 --- a/lib/register_common/adapters/http_adapter.rb +++ b/lib/register_common/adapters/http_adapter.rb @@ -8,51 +8,40 @@ module RegisterCommon module Adapters class HttpAdapter HttpError = Class.new(StandardError) + HttpResponse = Struct.new(:status, :headers, :body, :success) - # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - def get(url, params: {}, headers: {}, raise_on_failure: false) - streamed = nil - response = - if block_given? - streamed = [] - current_chunk = '' - - Faraday.new.get( - URI(url), params, headers - ) do |req| - req.options.on_data = proc do |chunk, _overall_received_bytes| - current_chunk += chunk - lines = current_chunk.split("\n") - if current_chunk[-1] == "\n" - lines[0...-1].each do |line| - next if line.empty? - - yield line - end - current_chunk = '' - elsif lines.length > 1 - lines[0...-1].each do |line| - next if line.empty? - - yield line - end - current_chunk = lines[-1] - end - streamed << chunk + def get(url, params: {}, headers: {}, raise_on_failure: false, &block) + url = URI(url) + res = if block_given? + get_streamed(url, params, headers, &block) + else + get_all(url, params, headers) end - end - else - Faraday.new.get( - URI(url), params, headers - ) - end + raise HttpError if !res.success? && raise_on_failure + + HttpResponse.new(res.status, res.headers, res.body, res.success?) + end - raise HttpError if !response.success? && raise_on_failure + private - HttpResponse.new(response.status, response.headers, streamed ? streamed.join : response.body, response.success?) + def get_all(url, params, headers) + Faraday.new.get(url, params, headers) + end + + def get_streamed(url, params, headers) + bfr = '' + Faraday.new.get(url, params, headers) do |req| + req.options.on_data = proc do |chunk| + bfr += chunk + lines = bfr.split("\n", -1) + lines[0...-1].each do |line| + yield line unless line.empty? + end + bfr = lines[-1] + end + end end - # rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity end end end