Skip to content

Commit

Permalink
Add HTTP_POLLING option to Atc::Aws::RemoteFixityCheck; Add additiona…
Browse files Browse the repository at this point in the history
…l validations for check_please.yml config; Add atc:comparison:list_multipart_info task for later use
  • Loading branch information
elohanlon committed Oct 6, 2024
1 parent d043b60 commit 579a110
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 8 deletions.
2 changes: 1 addition & 1 deletion app/jobs/verify_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def perform(stored_object_id)
def handle_unexpected_error(fixity_verification_record, err)
fixity_verification_record.update!(
status: :failure,
error_message: "An unexpected error occurred: #{err.message}"
error_message: "An unexpected error occurred: #{err.class.name} -> #{err.message}"
)
end

Expand Down
12 changes: 10 additions & 2 deletions config/initializers/check_please.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# frozen_string_literal: true

# Load CHECK_PLEASE config
CHECK_PLEASE = Rails.application.config_for(:check_please).deep_symbolize_keys
CHECK_PLEASE = Rails.application.config_for(:check_please)

def validate_check_please_config!
missing_options = [:http_base_url, :ws_url, :auth_token, :http_timeout].reject do |required_config_option|
CHECK_PLEASE.key?(required_config_option)
end
raise "Missing required check_please.yml options: #{missing_options.inspect}" if missing_options.present?
end

validate_check_please_config!
5 changes: 4 additions & 1 deletion lib/atc/aws/fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Atc::Aws::FixityCheck
def initialize(stored_object, fixity_check_identifier)
@bucket_name = stored_object.storage_provider.container_name
@object_path = stored_object.path
@expected_object_size = stored_object.source_object.object_size
@fixity_checksum_algorithm = stored_object.source_object.fixity_checksum_algorithm
@fixity_check_identifier = fixity_check_identifier
end
Expand All @@ -18,7 +19,9 @@ def fixity_checksum_object_size
).perform(
@fixity_check_identifier, @bucket_name,
@object_path, @fixity_checksum_algorithm.name.downcase,
Atc::Aws::RemoteFixityCheck::HTTP
# A synchronous HTTP check is faster than polling for smaller files,
# so we'll only use the polling method for larger files.
@expected_object_size < 1.gigabyte ? Atc::Aws::RemoteFixityCheck::HTTP : Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
[response['checksum_hexdigest'], response['object_size'], response['error_message']]
end
Expand Down
70 changes: 69 additions & 1 deletion lib/atc/aws/remote_fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

class Atc::Aws::RemoteFixityCheck
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 10.seconds
POLLING_DELAY = 2.seconds
MAX_WAIT_TIME_FOR_POLLING_JOB_START = 1.minute
WEBSOCKET = 'websocket'
HTTP = 'http'
HTTP_POLLING = 'http_polling'

def initialize(http_base_url, ws_url, auth_token)
@http_base_url = http_base_url
Expand Down Expand Up @@ -41,6 +44,8 @@ def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name, m
perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)['data']
when HTTP
perform_http(bucket_name, object_path, checksum_algorithm_name)
when HTTP_POLLING
perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
else
raise ArgumentError, "Unsupported perform method: #{method}"
end
Expand All @@ -61,7 +66,70 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name)
JSON.parse(response.body)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil, 'error_message' => "An unexpected error occurred: #{e.message}"
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
start_time = Time.current
payload = {
'fixity_check' => {
'bucket_name' => bucket_name,
'object_path' => object_path,
'checksum_algorithm_name' => checksum_algorithm_name
}
}.to_json

fixity_check_create_response = http_client.post('/fixity_checks', payload) { |request|
request.headers['Content-Type'] = 'application/json'
}.body

if fixity_check_create_response['error_message'].present?
# Raise any unexpected error message. It will be handled elsewhere.
raise Atc::Exceptions::AtcError,
fixity_check_create_response['error_message']
end

# If we got here, that means that the fixity check request was created successfully.
# Now we'll poll and wait for it to complete.
last_progress_update_time = nil
fixity_check_response = nil
loop do
sleep POLLING_DELAY
fixity_check_response = http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request|
request.headers['Content-Type'] = 'application/json'
}.body
status = fixity_check_response['status']

break if ['success', 'error'].include?(status)

# If we receive a pending status, we're waiting for a background job to start processing our request.
# Ideally this won't be for too long, since we expect there to be at least as manay CheckPlease background
# workers as there are ATC fixity check request workers, but we'll add a timeout here just in case anything
# is ever incorrectly configured, just so that this job doesn't ever hang indefinitely.
if status == 'pending'
next if Time.current - start_time < MAX_WAIT_TIME_FOR_POLLING_JOB_START

raise Atc::Exceptions::PollingWaitTimeoutError,
'Polling wait time has exceeded MAX_WAIT_TIME_FOR_POLLING_JOB_START '\
"(#{MAX_WAIT_TIME_FOR_POLLING_JOB_START} seconds)"
end

# If we got here, that means that the job is in progress. Let's account for the
# possibility of the job timing out, if something goes wrong on the CheckPlease app side.

last_progress_update_time = Time.zone.parse(fixity_check_response['updated_at'])
if job_unresponsive?(last_progress_update_time)
raise Atc::Exceptions::RemoteFixityCheckTimeout,
'Timed out while waiting for a response.'
end
end
fixity_check_response
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

Expand Down
1 change: 1 addition & 0 deletions lib/atc/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ class TransferError < AtcError; end
class ObjectExists < AtcError; end
class StorageProviderMappingNotFound < AtcError; end
class RemoteFixityCheckTimeout < AtcError; end
class PollingWaitTimeoutError < AtcError; end
end
2 changes: 1 addition & 1 deletion lib/tasks/aws.rake
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace :atc do
)
response = remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name,
Atc::Aws::RemoteFixityCheck::HTTP
Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
puts "Response: #{response.inspect}"
end
Expand Down
20 changes: 20 additions & 0 deletions lib/tasks/comparison.rake
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,25 @@ namespace :atc do

puts "Done!"
end

desc 'List multipart info for an S3 object (including part size)'
# Example usage: bucket_name=cul-dlstor-digital-testing1 path='test-5gb-file-ubuntu.iso'
task list_multipart_info: :environment do
bucket_name = ENV['bucket_name']
path = ENV['path']

attributes = S3_CLIENT.get_object_attributes(
bucket: bucket_name,
key: path,
object_attributes: %w[ETag Checksum ObjectParts StorageClass ObjectSize]
)

if attributes.object_parts.blank?
puts "Single part file."
else
puts "Found multipart file (#{attributes.object_parts.total_parts_count} parts) "\
"with individual part array data for #{attributes.object_parts.parts.length} parts."
end
end
end
end
2 changes: 1 addition & 1 deletion spec/atc/aws/remote_fixity_check_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
{
'checksum_hexdigest' => nil,
'object_size' => nil,
'error_message' => 'An unexpected error occurred: oh no!'
'error_message' => 'An unexpected error occurred: StandardError -> oh no!'
}
)
end
Expand Down
4 changes: 3 additions & 1 deletion spec/jobs/verify_fixity_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
expect(aws_fixity_verification_pending).to receive(:update!).and_call_original
verify_fixity_job.handle_unexpected_error(aws_fixity_verification_pending, StandardError.new('oh no!'))
expect(aws_fixity_verification_pending.status).to eq('failure')
expect(aws_fixity_verification_pending.error_message).to eq('An unexpected error occurred: oh no!')
expect(aws_fixity_verification_pending.error_message).to eq(
'An unexpected error occurred: StandardError -> oh no!'
)
expect(aws_fixity_verification_pending.changed?).to eq(false) # verify that the record has no unsaved changes
end
end
Expand Down

0 comments on commit 579a110

Please sign in to comment.