diff --git a/app/jobs/verify_fixity_job.rb b/app/jobs/verify_fixity_job.rb index 1e42e51..e292785 100644 --- a/app/jobs/verify_fixity_job.rb +++ b/app/jobs/verify_fixity_job.rb @@ -25,7 +25,7 @@ def perform(stored_object_id) def handle_unexpected_error(fixity_verification_record, err) fixity_verification_record.update!( status: :failure, - error_message: "An unexpected error occurred: #{err.message}" + error_message: "An unexpected error occurred: #{err.class.name} -> #{err.message}" ) end diff --git a/config/initializers/check_please.rb b/config/initializers/check_please.rb index 3a7e9f1..a731818 100644 --- a/config/initializers/check_please.rb +++ b/config/initializers/check_please.rb @@ -1,4 +1,12 @@ # frozen_string_literal: true -# Load CHECK_PLEASE config -CHECK_PLEASE = Rails.application.config_for(:check_please).deep_symbolize_keys +CHECK_PLEASE = Rails.application.config_for(:check_please) + +def validate_check_please_config! + missing_options = [:http_base_url, :ws_url, :auth_token, :http_timeout].reject do |required_config_option| + CHECK_PLEASE.key?(required_config_option) + end + raise "Missing required check_please.yml options: #{missing_options.inspect}" if missing_options.present? +end + +validate_check_please_config! diff --git a/config/templates/check_please.template.yml b/config/templates/check_please.template.yml index 06ee634..3e5fc62 100644 --- a/config/templates/check_please.template.yml +++ b/config/templates/check_please.template.yml @@ -1,6 +1,10 @@ development: + http_base_url: 'http://dev.example.com' ws_url: 'ws://dev.example.com/cable' auth_token: 'changethis' + http_timeout: 300 # 5 minutes test: + http_base_url: 'http://test.example.com' ws_url: 'ws://test.example.com/cable' auth_token: 'changethis' + http_timeout: 300 # 5 minutes diff --git a/lib/atc/aws/fixity_check.rb b/lib/atc/aws/fixity_check.rb index 1561c8d..0502ee7 100644 --- a/lib/atc/aws/fixity_check.rb +++ b/lib/atc/aws/fixity_check.rb @@ -4,6 +4,7 @@ class Atc::Aws::FixityCheck def initialize(stored_object, fixity_check_identifier) @bucket_name = stored_object.storage_provider.container_name @object_path = stored_object.path + @expected_object_size = stored_object.source_object.object_size @fixity_checksum_algorithm = stored_object.source_object.fixity_checksum_algorithm @fixity_check_identifier = fixity_check_identifier end @@ -18,7 +19,9 @@ def fixity_checksum_object_size ).perform( @fixity_check_identifier, @bucket_name, @object_path, @fixity_checksum_algorithm.name.downcase, - Atc::Aws::RemoteFixityCheck::HTTP + # A synchronous HTTP check is faster than polling for smaller files, + # so we'll only use the polling method for larger files. + @expected_object_size < 1.gigabyte ? Atc::Aws::RemoteFixityCheck::HTTP : Atc::Aws::RemoteFixityCheck::HTTP_POLLING ) [response['checksum_hexdigest'], response['object_size'], response['error_message']] end diff --git a/lib/atc/aws/remote_fixity_check.rb b/lib/atc/aws/remote_fixity_check.rb index 6ba4649..3d37932 100644 --- a/lib/atc/aws/remote_fixity_check.rb +++ b/lib/atc/aws/remote_fixity_check.rb @@ -7,9 +7,14 @@ # rubocop:disable Metrics/PerceivedComplexity class Atc::Aws::RemoteFixityCheck - STALLED_FIXITY_CHECK_JOB_TIMEOUT = 10.seconds + # This value shouldn't be too high because we want to detect stalled jobs fairly soon after they stall, + # but it should be high enough to account for downtime/delays related to CheckPlease app deployments. + STALLED_FIXITY_CHECK_JOB_TIMEOUT = 1.minute + POLLING_DELAY = 2.seconds + MAX_WAIT_TIME_FOR_POLLING_JOB_START = 1.hour WEBSOCKET = 'websocket' HTTP = 'http' + HTTP_POLLING = 'http_polling' def initialize(http_base_url, ws_url, auth_token) @http_base_url = http_base_url @@ -18,7 +23,7 @@ def initialize(http_base_url, ws_url, auth_token) end def http_client - @http_client ||= ::Faraday.new(url: @http_base_url) do |f| + @http_client ||= ::Faraday.new(url: @http_base_url, request: { timeout: CHECK_PLEASE['http_timeout'] }) do |f| f.request :authorization, 'Bearer', @auth_token f.response :json # decode response bodies as JSON f.response :raise_error # raise 4xx and 5xx responses as errors @@ -41,6 +46,8 @@ def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name, m perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)['data'] when HTTP perform_http(bucket_name, object_path, checksum_algorithm_name) + when HTTP_POLLING + perform_http_polling(bucket_name, object_path, checksum_algorithm_name) else raise ArgumentError, "Unsupported perform method: #{method}" end @@ -61,7 +68,67 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name) JSON.parse(response.body) rescue StandardError => e { - 'checksum_hexdigest' => nil, 'object_size' => nil, 'error_message' => "An unexpected error occurred: #{e.message}" + 'checksum_hexdigest' => nil, 'object_size' => nil, + 'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}" + } + end + + def perform_http_polling(bucket_name, object_path, checksum_algorithm_name) + payload = { + 'fixity_check' => { + 'bucket_name' => bucket_name, + 'object_path' => object_path, + 'checksum_algorithm_name' => checksum_algorithm_name + } + }.to_json + + fixity_check_create_response = http_client.post('/fixity_checks', payload) { |request| + request.headers['Content-Type'] = 'application/json' + }.body + + if fixity_check_create_response['error_message'].present? + # Raise any unexpected error message. It will be handled elsewhere. + raise Atc::Exceptions::AtcError, + fixity_check_create_response['error_message'] + end + + # If we got here, that means that the fixity check request was created successfully. + # Now we'll poll and wait for it to complete. + last_progress_update_time = nil + fixity_check_response = nil + loop do + sleep POLLING_DELAY + + fixity_check_response = http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request| + request.headers['Content-Type'] = 'application/json' + }.body + + status = fixity_check_response['status'] + + break if ['success', 'error'].include?(status) + + # If the fixity check has a 'pending' status, we'll skip this polling iteration and try again in the next poll + # operation. It is the reponsibility of the CheckPlease app to periodically clean up jobs that have been pending + # for too long, so we don't need to worry about that case in this app. + next if status == 'pending' + + # If we got here, that means that the job is in progress. Let's account for the + # possibility of the job timing out, if something goes wrong on the CheckPlease app side. + last_progress_update_time = Time.zone.parse(fixity_check_response['updated_at']) + if job_unresponsive?(last_progress_update_time) + raise Atc::Exceptions::RemoteFixityCheckTimeout, + 'Timed out while waiting for a response.' + end + rescue Faraday::Error => e + # If we run into a network error, log it, but continue looping + Rails.logger.info 'Error connecting to CheckPlease app during fixity check polling '\ + "loop iteration: #{e.class.name} -> #{e.message}" + end + fixity_check_response + rescue StandardError => e + { + 'checksum_hexdigest' => nil, 'object_size' => nil, + 'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}" } end diff --git a/lib/atc/exceptions.rb b/lib/atc/exceptions.rb index f210aee..f7a5cae 100644 --- a/lib/atc/exceptions.rb +++ b/lib/atc/exceptions.rb @@ -7,4 +7,5 @@ class TransferError < AtcError; end class ObjectExists < AtcError; end class StorageProviderMappingNotFound < AtcError; end class RemoteFixityCheckTimeout < AtcError; end + class PollingWaitTimeoutError < AtcError; end end diff --git a/lib/tasks/aws.rake b/lib/tasks/aws.rake index c413a22..6163c94 100644 --- a/lib/tasks/aws.rake +++ b/lib/tasks/aws.rake @@ -14,7 +14,7 @@ namespace :atc do ) response = remote_fixity_check.perform( job_identifier, bucket_name, object_path, checksum_algorithm_name, - Atc::Aws::RemoteFixityCheck::HTTP + Atc::Aws::RemoteFixityCheck::HTTP_POLLING ) puts "Response: #{response.inspect}" end diff --git a/lib/tasks/comparison.rake b/lib/tasks/comparison.rake index 8fbd994..c56c026 100644 --- a/lib/tasks/comparison.rake +++ b/lib/tasks/comparison.rake @@ -41,5 +41,25 @@ namespace :atc do puts "Done!" end + + desc 'List multipart info for an S3 object (including part size)' + # Example usage: bucket_name=cul-dlstor-digital-testing1 path='test-5gb-file-ubuntu.iso' + task list_multipart_info: :environment do + bucket_name = ENV['bucket_name'] + path = ENV['path'] + + attributes = S3_CLIENT.get_object_attributes( + bucket: bucket_name, + key: path, + object_attributes: %w[ETag Checksum ObjectParts StorageClass ObjectSize] + ) + + if attributes.object_parts.blank? + puts "Single part file." + else + puts "Found multipart file (#{attributes.object_parts.total_parts_count} parts) "\ + "with individual part array data for #{attributes.object_parts.parts.length} parts." + end + end end end diff --git a/spec/atc/aws/remote_fixity_check_spec.rb b/spec/atc/aws/remote_fixity_check_spec.rb index 011e36b..2589ec6 100644 --- a/spec/atc/aws/remote_fixity_check_spec.rb +++ b/spec/atc/aws/remote_fixity_check_spec.rb @@ -139,7 +139,7 @@ { 'checksum_hexdigest' => nil, 'object_size' => nil, - 'error_message' => 'An unexpected error occurred: oh no!' + 'error_message' => 'An unexpected error occurred: StandardError -> oh no!' } ) end diff --git a/spec/jobs/verify_fixity_job_spec.rb b/spec/jobs/verify_fixity_job_spec.rb index 094a343..14e3c05 100644 --- a/spec/jobs/verify_fixity_job_spec.rb +++ b/spec/jobs/verify_fixity_job_spec.rb @@ -80,7 +80,9 @@ expect(aws_fixity_verification_pending).to receive(:update!).and_call_original verify_fixity_job.handle_unexpected_error(aws_fixity_verification_pending, StandardError.new('oh no!')) expect(aws_fixity_verification_pending.status).to eq('failure') - expect(aws_fixity_verification_pending.error_message).to eq('An unexpected error occurred: oh no!') + expect(aws_fixity_verification_pending.error_message).to eq( + 'An unexpected error occurred: StandardError -> oh no!' + ) expect(aws_fixity_verification_pending.changed?).to eq(false) # verify that the record has no unsaved changes end end