Skip to content

Commit

Permalink
Merge pull request #60 from cul/extended-remote-fixity-check-http-tim…
Browse files Browse the repository at this point in the history
…eout-and-alternate-polling-method

Extended remote fixity check http timeout and alternate polling method
  • Loading branch information
elohanlon authored Oct 28, 2024
2 parents 44dcfb2 + 7d779e4 commit 3644132
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 10 deletions.
2 changes: 1 addition & 1 deletion app/jobs/verify_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def perform(stored_object_id)
def handle_unexpected_error(fixity_verification_record, err)
fixity_verification_record.update!(
status: :failure,
error_message: "An unexpected error occurred: #{err.message}"
error_message: "An unexpected error occurred: #{err.class.name} -> #{err.message}"
)
end

Expand Down
12 changes: 10 additions & 2 deletions config/initializers/check_please.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# frozen_string_literal: true

# Load CHECK_PLEASE config
CHECK_PLEASE = Rails.application.config_for(:check_please).deep_symbolize_keys
CHECK_PLEASE = Rails.application.config_for(:check_please)

def validate_check_please_config!
missing_options = [:http_base_url, :ws_url, :auth_token, :http_timeout].reject do |required_config_option|
CHECK_PLEASE.key?(required_config_option)
end
raise "Missing required check_please.yml options: #{missing_options.inspect}" if missing_options.present?
end

validate_check_please_config!
4 changes: 4 additions & 0 deletions config/templates/check_please.template.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
development:
http_base_url: 'http://dev.example.com'
ws_url: 'ws://dev.example.com/cable'
auth_token: 'changethis'
http_timeout: 300 # 5 minutes
test:
http_base_url: 'http://test.example.com'
ws_url: 'ws://test.example.com/cable'
auth_token: 'changethis'
http_timeout: 300 # 5 minutes
5 changes: 4 additions & 1 deletion lib/atc/aws/fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Atc::Aws::FixityCheck
def initialize(stored_object, fixity_check_identifier)
@bucket_name = stored_object.storage_provider.container_name
@object_path = stored_object.path
@expected_object_size = stored_object.source_object.object_size
@fixity_checksum_algorithm = stored_object.source_object.fixity_checksum_algorithm
@fixity_check_identifier = fixity_check_identifier
end
Expand All @@ -18,7 +19,9 @@ def fixity_checksum_object_size
).perform(
@fixity_check_identifier, @bucket_name,
@object_path, @fixity_checksum_algorithm.name.downcase,
Atc::Aws::RemoteFixityCheck::HTTP
# A synchronous HTTP check is faster than polling for smaller files,
# so we'll only use the polling method for larger files.
@expected_object_size < 1.gigabyte ? Atc::Aws::RemoteFixityCheck::HTTP : Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
[response['checksum_hexdigest'], response['object_size'], response['error_message']]
end
Expand Down
73 changes: 70 additions & 3 deletions lib/atc/aws/remote_fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
# rubocop:disable Metrics/PerceivedComplexity

class Atc::Aws::RemoteFixityCheck
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 10.seconds
# This value shouldn't be too high because we want to detect stalled jobs fairly soon after they stall,
# but it should be high enough to account for downtime/delays related to CheckPlease app deployments.
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 1.minute
POLLING_DELAY = 2.seconds
MAX_WAIT_TIME_FOR_POLLING_JOB_START = 1.hour
WEBSOCKET = 'websocket'
HTTP = 'http'
HTTP_POLLING = 'http_polling'

def initialize(http_base_url, ws_url, auth_token)
@http_base_url = http_base_url
Expand All @@ -18,7 +23,7 @@ def initialize(http_base_url, ws_url, auth_token)
end

def http_client
@http_client ||= ::Faraday.new(url: @http_base_url) do |f|
@http_client ||= ::Faraday.new(url: @http_base_url, request: { timeout: CHECK_PLEASE['http_timeout'] }) do |f|
f.request :authorization, 'Bearer', @auth_token
f.response :json # decode response bodies as JSON
f.response :raise_error # raise 4xx and 5xx responses as errors
Expand All @@ -41,6 +46,8 @@ def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name, m
perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)['data']
when HTTP
perform_http(bucket_name, object_path, checksum_algorithm_name)
when HTTP_POLLING
perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
else
raise ArgumentError, "Unsupported perform method: #{method}"
end
Expand All @@ -61,7 +68,67 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name)
JSON.parse(response.body)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil, 'error_message' => "An unexpected error occurred: #{e.message}"
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
payload = {
'fixity_check' => {
'bucket_name' => bucket_name,
'object_path' => object_path,
'checksum_algorithm_name' => checksum_algorithm_name
}
}.to_json

fixity_check_create_response = http_client.post('/fixity_checks', payload) { |request|
request.headers['Content-Type'] = 'application/json'
}.body

if fixity_check_create_response['error_message'].present?
# Raise any unexpected error message. It will be handled elsewhere.
raise Atc::Exceptions::AtcError,
fixity_check_create_response['error_message']
end

# If we got here, that means that the fixity check request was created successfully.
# Now we'll poll and wait for it to complete.
last_progress_update_time = nil
fixity_check_response = nil
loop do
sleep POLLING_DELAY

fixity_check_response = http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request|
request.headers['Content-Type'] = 'application/json'
}.body

status = fixity_check_response['status']

break if ['success', 'error'].include?(status)

# If the fixity check has a 'pending' status, we'll skip this polling iteration and try again in the next poll
# operation. It is the reponsibility of the CheckPlease app to periodically clean up jobs that have been pending
# for too long, so we don't need to worry about that case in this app.
next if status == 'pending'

# If we got here, that means that the job is in progress. Let's account for the
# possibility of the job timing out, if something goes wrong on the CheckPlease app side.
last_progress_update_time = Time.zone.parse(fixity_check_response['updated_at'])
if job_unresponsive?(last_progress_update_time)
raise Atc::Exceptions::RemoteFixityCheckTimeout,
'Timed out while waiting for a response.'
end
rescue Faraday::Error => e
# If we run into a network error, log it, but continue looping
Rails.logger.info 'Error connecting to CheckPlease app during fixity check polling '\
"loop iteration: #{e.class.name} -> #{e.message}"
end
fixity_check_response
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

Expand Down
1 change: 1 addition & 0 deletions lib/atc/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ class TransferError < AtcError; end
class ObjectExists < AtcError; end
class StorageProviderMappingNotFound < AtcError; end
class RemoteFixityCheckTimeout < AtcError; end
class PollingWaitTimeoutError < AtcError; end
end
2 changes: 1 addition & 1 deletion lib/tasks/aws.rake
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace :atc do
)
response = remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name,
Atc::Aws::RemoteFixityCheck::HTTP
Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
puts "Response: #{response.inspect}"
end
Expand Down
20 changes: 20 additions & 0 deletions lib/tasks/comparison.rake
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,25 @@ namespace :atc do

puts "Done!"
end

desc 'List multipart info for an S3 object (including part size)'
# Example usage: bucket_name=cul-dlstor-digital-testing1 path='test-5gb-file-ubuntu.iso'
task list_multipart_info: :environment do
bucket_name = ENV['bucket_name']
path = ENV['path']

attributes = S3_CLIENT.get_object_attributes(
bucket: bucket_name,
key: path,
object_attributes: %w[ETag Checksum ObjectParts StorageClass ObjectSize]
)

if attributes.object_parts.blank?
puts "Single part file."
else
puts "Found multipart file (#{attributes.object_parts.total_parts_count} parts) "\
"with individual part array data for #{attributes.object_parts.parts.length} parts."
end
end
end
end
2 changes: 1 addition & 1 deletion spec/atc/aws/remote_fixity_check_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
{
'checksum_hexdigest' => nil,
'object_size' => nil,
'error_message' => 'An unexpected error occurred: oh no!'
'error_message' => 'An unexpected error occurred: StandardError -> oh no!'
}
)
end
Expand Down
4 changes: 3 additions & 1 deletion spec/jobs/verify_fixity_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
expect(aws_fixity_verification_pending).to receive(:update!).and_call_original
verify_fixity_job.handle_unexpected_error(aws_fixity_verification_pending, StandardError.new('oh no!'))
expect(aws_fixity_verification_pending.status).to eq('failure')
expect(aws_fixity_verification_pending.error_message).to eq('An unexpected error occurred: oh no!')
expect(aws_fixity_verification_pending.error_message).to eq(
'An unexpected error occurred: StandardError -> oh no!'
)
expect(aws_fixity_verification_pending.changed?).to eq(false) # verify that the record has no unsaved changes
end
end
Expand Down

0 comments on commit 3644132

Please sign in to comment.