Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended remote fixity check http timeout and alternate polling method #60

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/jobs/verify_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def perform(stored_object_id)
def handle_unexpected_error(fixity_verification_record, err)
fixity_verification_record.update!(
status: :failure,
error_message: "An unexpected error occurred: #{err.message}"
error_message: "An unexpected error occurred: #{err.class.name} -> #{err.message}"
)
end

Expand Down
12 changes: 10 additions & 2 deletions config/initializers/check_please.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# frozen_string_literal: true

# Load CHECK_PLEASE config
CHECK_PLEASE = Rails.application.config_for(:check_please).deep_symbolize_keys
CHECK_PLEASE = Rails.application.config_for(:check_please)

def validate_check_please_config!
missing_options = [:http_base_url, :ws_url, :auth_token, :http_timeout].reject do |required_config_option|
CHECK_PLEASE.key?(required_config_option)
end
raise "Missing required check_please.yml options: #{missing_options.inspect}" if missing_options.present?
end

validate_check_please_config!
4 changes: 4 additions & 0 deletions config/templates/check_please.template.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
development:
http_base_url: 'http://dev.example.com'
ws_url: 'ws://dev.example.com/cable'
auth_token: 'changethis'
http_timeout: 300 # 5 minutes
test:
http_base_url: 'http://test.example.com'
ws_url: 'ws://test.example.com/cable'
auth_token: 'changethis'
http_timeout: 300 # 5 minutes
5 changes: 4 additions & 1 deletion lib/atc/aws/fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Atc::Aws::FixityCheck
def initialize(stored_object, fixity_check_identifier)
@bucket_name = stored_object.storage_provider.container_name
@object_path = stored_object.path
@expected_object_size = stored_object.source_object.object_size
@fixity_checksum_algorithm = stored_object.source_object.fixity_checksum_algorithm
@fixity_check_identifier = fixity_check_identifier
end
Expand All @@ -18,7 +19,9 @@ def fixity_checksum_object_size
).perform(
@fixity_check_identifier, @bucket_name,
@object_path, @fixity_checksum_algorithm.name.downcase,
Atc::Aws::RemoteFixityCheck::HTTP
# A synchronous HTTP check is faster than polling for smaller files,
# so we'll only use the polling method for larger files.
@expected_object_size < 1.gigabyte ? Atc::Aws::RemoteFixityCheck::HTTP : Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
[response['checksum_hexdigest'], response['object_size'], response['error_message']]
end
Expand Down
73 changes: 70 additions & 3 deletions lib/atc/aws/remote_fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
# rubocop:disable Metrics/PerceivedComplexity

class Atc::Aws::RemoteFixityCheck
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 10.seconds
# This value shouldn't be too high because we want to detect stalled jobs fairly soon after they stall,
# but it should be high enough to account for downtime/delays related to CheckPlease app deployments.
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 1.minute
POLLING_DELAY = 2.seconds
MAX_WAIT_TIME_FOR_POLLING_JOB_START = 1.hour
WEBSOCKET = 'websocket'
HTTP = 'http'
HTTP_POLLING = 'http_polling'

def initialize(http_base_url, ws_url, auth_token)
@http_base_url = http_base_url
Expand All @@ -18,7 +23,7 @@ def initialize(http_base_url, ws_url, auth_token)
end

def http_client
@http_client ||= ::Faraday.new(url: @http_base_url) do |f|
@http_client ||= ::Faraday.new(url: @http_base_url, request: { timeout: CHECK_PLEASE['http_timeout'] }) do |f|
f.request :authorization, 'Bearer', @auth_token
f.response :json # decode response bodies as JSON
f.response :raise_error # raise 4xx and 5xx responses as errors
Expand All @@ -41,6 +46,8 @@ def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name, m
perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)['data']
when HTTP
perform_http(bucket_name, object_path, checksum_algorithm_name)
when HTTP_POLLING
perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
else
raise ArgumentError, "Unsupported perform method: #{method}"
end
Expand All @@ -61,7 +68,67 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name)
JSON.parse(response.body)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil, 'error_message' => "An unexpected error occurred: #{e.message}"
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
payload = {
'fixity_check' => {
'bucket_name' => bucket_name,
'object_path' => object_path,
'checksum_algorithm_name' => checksum_algorithm_name
}
}.to_json

fixity_check_create_response = http_client.post('/fixity_checks', payload) { |request|
request.headers['Content-Type'] = 'application/json'
}.body

if fixity_check_create_response['error_message'].present?
# Raise any unexpected error message. It will be handled elsewhere.
raise Atc::Exceptions::AtcError,
fixity_check_create_response['error_message']
end

# If we got here, that means that the fixity check request was created successfully.
# Now we'll poll and wait for it to complete.
last_progress_update_time = nil
fixity_check_response = nil
loop do
sleep POLLING_DELAY

fixity_check_response = http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request|
request.headers['Content-Type'] = 'application/json'
}.body

status = fixity_check_response['status']

break if ['success', 'error'].include?(status)

# If the fixity check has a 'pending' status, we'll skip this polling iteration and try again in the next poll
# operation. It is the reponsibility of the CheckPlease app to periodically clean up jobs that have been pending
# for too long, so we don't need to worry about that case in this app.
next if status == 'pending'

# If we got here, that means that the job is in progress. Let's account for the
# possibility of the job timing out, if something goes wrong on the CheckPlease app side.
last_progress_update_time = Time.zone.parse(fixity_check_response['updated_at'])
if job_unresponsive?(last_progress_update_time)
raise Atc::Exceptions::RemoteFixityCheckTimeout,
'Timed out while waiting for a response.'
end
rescue Faraday::Error => e
# If we run into a network error, log it, but continue looping
Rails.logger.info 'Error connecting to CheckPlease app during fixity check polling '\
"loop iteration: #{e.class.name} -> #{e.message}"
end
fixity_check_response
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
}
end

Expand Down
1 change: 1 addition & 0 deletions lib/atc/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ class TransferError < AtcError; end
class ObjectExists < AtcError; end
class StorageProviderMappingNotFound < AtcError; end
class RemoteFixityCheckTimeout < AtcError; end
class PollingWaitTimeoutError < AtcError; end
end
2 changes: 1 addition & 1 deletion lib/tasks/aws.rake
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace :atc do
)
response = remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name,
Atc::Aws::RemoteFixityCheck::HTTP
Atc::Aws::RemoteFixityCheck::HTTP_POLLING
)
puts "Response: #{response.inspect}"
end
Expand Down
20 changes: 20 additions & 0 deletions lib/tasks/comparison.rake
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,25 @@ namespace :atc do

puts "Done!"
end

desc 'List multipart info for an S3 object (including part size)'
# Example usage: bucket_name=cul-dlstor-digital-testing1 path='test-5gb-file-ubuntu.iso'
task list_multipart_info: :environment do
bucket_name = ENV['bucket_name']
path = ENV['path']

attributes = S3_CLIENT.get_object_attributes(
bucket: bucket_name,
key: path,
object_attributes: %w[ETag Checksum ObjectParts StorageClass ObjectSize]
)

if attributes.object_parts.blank?
puts "Single part file."
else
puts "Found multipart file (#{attributes.object_parts.total_parts_count} parts) "\
"with individual part array data for #{attributes.object_parts.parts.length} parts."
end
end
end
end
2 changes: 1 addition & 1 deletion spec/atc/aws/remote_fixity_check_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
{
'checksum_hexdigest' => nil,
'object_size' => nil,
'error_message' => 'An unexpected error occurred: oh no!'
'error_message' => 'An unexpected error occurred: StandardError -> oh no!'
}
)
end
Expand Down
4 changes: 3 additions & 1 deletion spec/jobs/verify_fixity_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
expect(aws_fixity_verification_pending).to receive(:update!).and_call_original
verify_fixity_job.handle_unexpected_error(aws_fixity_verification_pending, StandardError.new('oh no!'))
expect(aws_fixity_verification_pending.status).to eq('failure')
expect(aws_fixity_verification_pending.error_message).to eq('An unexpected error occurred: oh no!')
expect(aws_fixity_verification_pending.error_message).to eq(
'An unexpected error occurred: StandardError -> oh no!'
)
expect(aws_fixity_verification_pending.changed?).to eq(false) # verify that the record has no unsaved changes
end
end
Expand Down
Loading