Skip to content

Commit

Permalink
Switch from websocket to http in Atc::Aws::FixityCheck class; Also re…
Browse files Browse the repository at this point in the history
…factor code to simplify swapping between WEBSOCKET and HTTP fixity check methods
  • Loading branch information
elohanlon committed Jul 26, 2024
1 parent 3950ef8 commit 5298789
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 98 deletions.
10 changes: 10 additions & 0 deletions app/jobs/verify_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ def perform(stored_object_id)
fixity_verification_record = create_pending_fixity_verification stored_object
provider_fixity_check = instantiate_provider_fixity_check fixity_verification_record
verify_fixity(fixity_verification_record, provider_fixity_check)
rescue StandardError => e
handle_unexpected_error(fixity_verification_record, e) unless fixity_verification_record.nil?
end

def handle_unexpected_error(fixity_verification_record, err)
fixity_verification_record.update!(
status: :failure,
error_message: "An unexpected error occurred: #{err.message}"
)
end

def process_existing_fixity_verification_record(existing_fixity_verification_record)
Expand Down Expand Up @@ -49,6 +58,7 @@ def instantiate_provider_fixity_check(fixity_verification_record)

def verify_fixity(fixity_verification_record, provider_fixity_check)
object_checksum, object_size, fixity_check_error = provider_fixity_check.fixity_checksum_object_size

if fixity_check_error.present?
fixity_verification_record.error_message = fixity_check_error
fixity_verification_record.failure!
Expand Down
44 changes: 14 additions & 30 deletions lib/atc/aws/fixity_check.rb
Original file line number Diff line number Diff line change
@@ -1,41 +1,25 @@
# frozen_string_literal: true

class Atc::Aws::FixityCheck
def initialize(stored_object, stream_id)
def initialize(stored_object, fixity_check_identifier)
@bucket_name = stored_object.storage_provider.container_name
@object_path = stored_object.path
@fixity_checksum_algorithm = stored_object.source_object.fixity_checksum_algorithm
@stream_id = stream_id
@fixity_check_identifier = fixity_check_identifier
end

# Returns an array with the checksum, object size, and (if something went wrong) an error error_message.
# If there is an error, checksum and object size will be nil. If there is not an error,
# checksum and object size will be non-nil and error will be nil.
# @return [Array(String, Integer, String)] A 3-element array containing: [checksum, object_size, error_message]
def fixity_checksum_object_size
aws_fixity_check_response =
aws_fixity_websocket_channel_stream(@bucket_name,
@object_path,
@fixity_checksum_algorithm.name.downcase,
@stream_id)
case aws_fixity_check_response['type']
when 'fixity_check_complete'
[aws_fixity_check_response['data']['checksum_hexdigest'], aws_fixity_check_response['data']['object_size'], nil]
when 'fixity_check_error'
# if only want to return the error from the AWS fixity response, without data,
# use the following commented-out line instead of the last line
# [nil, nil, aws_fixity_check_response['data']['error_message']]
[nil, nil, response_data_as_string(aws_fixity_check_response)]
end
end

def response_data_as_string(aws_fixity_check_response)
# AWS response data contains, among other things, the error message
"AWS error response with the following data: #{aws_fixity_check_response['data']} "
end

def aws_fixity_websocket_channel_stream(bucket_name,
object_path,
checksum_algorithm_name,
job_identifier)
# Response received (Hash) is returned as-is.
remote_fixity_check = Atc::Aws::RemoteFixityCheck.new(CHECK_PLEASE['ws_url'], CHECK_PLEASE['auth_token'])
remote_fixity_check.perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
response = Atc::Aws::RemoteFixityCheck.new(
CHECK_PLEASE['http_base_url'], CHECK_PLEASE['ws_url'], CHECK_PLEASE['auth_token']
).perform(
@fixity_check_identifier, @bucket_name,
@object_path, @fixity_checksum_algorithm.name.downcase,
Atc::Aws::RemoteFixityCheck::HTTP
)
[response['checksum_hexdigest'], response['object_size'], response['error_message']]
end
end
9 changes: 6 additions & 3 deletions lib/atc/aws/remote_fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(http_base_url, ws_url, auth_token)
end

def http_client
@http_conn = ::Faraday.new(url: @http_base_url) do |f|
@http_client ||= ::Faraday.new(url: @http_base_url) do |f|
f.request :authorization, 'Bearer', @auth_token
f.response :json # decode response bodies as JSON
f.response :raise_error # raise 4xx and 5xx responses as errors
Expand All @@ -38,7 +38,7 @@ def create_websocket_connection
def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name, method = WEBSOCKET)
case method
when WEBSOCKET
perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)
perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)['data']
when HTTP
perform_http(bucket_name, object_path, checksum_algorithm_name)
else
Expand All @@ -54,12 +54,15 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name)
'checksum_algorithm_name' => checksum_algorithm_name
}
}.to_json

response = http_client.post('/fixity_checks/run_fixity_check_for_s3_object', payload) do |request|
request.headers['Content-Type'] = 'application/json'
end

JSON.parse(response.body)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil, 'error_message' => "An unexpected error occurred: #{e.message}"
}
end

def perform_websocket(job_identifier, bucket_name, object_path, checksum_algorithm_name)
Expand Down
65 changes: 35 additions & 30 deletions spec/atc/aws/fixity_check_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,55 @@
source_object: source_object,
storage_provider: aws_storage_provider)
end
let(:aws_hash_response) do
{ 'type' => 'fixity_check_complete',
'data' => { 'checksum_hexdigest' => 'ABCDEF12345', 'object_size' => 1234 } }
end
let(:aws_error_hash_response) do
{ 'type' => 'fixity_check_error',
'data' => { 'error_message' => 'Ooops!',
'job_identifier' => 1234,
'bucket_name' => 'cul_bucket',
'object_path' => 'I/Am/An/Object',
'checksum_algorithm_name' => 'SHA31415' } }
end

describe '#fixity_checksum_object_size' do
context 'with an AWS response without errors ' do
it 'returns the object checksum and object size, and nil for the aws error message' do
allow(aws_fixity_check).to receive(:aws_fixity_websocket_channel_stream) { aws_hash_response }
let(:remote_fixity_check) do
dbl = instance_double(Atc::Aws::RemoteFixityCheck)
allow(dbl).to receive(:perform).and_return(remote_fixity_check_perform_response)
dbl
end

before do
allow(Atc::Aws::RemoteFixityCheck).to receive(:new).and_return(remote_fixity_check)
end

context 'with a response without errors' do
let(:remote_fixity_check_perform_response) do
{
'checksum_hexdigest' => 'ABCDEF12345',
'object_size' => 1234
}
end

it 'returns the object checksum and object size, and nil for the error message' do
result = aws_fixity_check.fixity_checksum_object_size
expect(result).to eq(['ABCDEF12345', 1234, nil])
expect(result).to eq(
[
remote_fixity_check_perform_response['checksum_hexdigest'],
remote_fixity_check_perform_response['object_size'],
nil
]
)
end
end

context 'with an AWS response with errors ' do
context 'with a response with errors' do
let(:remote_fixity_check_perform_response) do
{
'error_message' => 'Ooops!'
}
end

it 'returns nil for the object checksum and object size' do
allow(aws_fixity_check).to receive(:aws_fixity_websocket_channel_stream) { aws_error_hash_response }
result = aws_fixity_check.fixity_checksum_object_size
expect(result[0]).to eq nil
expect(result[1]).to eq nil
end

it 'returns the error message (including data)' do
allow(aws_fixity_check).to receive(:aws_fixity_websocket_channel_stream) { aws_error_hash_response }
it 'returns the error message' do
result = aws_fixity_check.fixity_checksum_object_size
expect(result[2]).to include('Ooops!')
expect(result[2]).to include('cul_bucket')
expect(result[2]).to eq(remote_fixity_check_perform_response['error_message'])
end
end
end

describe '#response_data_as_string' do
it 'returns the aws response data info as a string' do
result = aws_fixity_check.response_data_as_string aws_error_hash_response
expect(result).to include('Ooops!')
expect(result).to include('cul_bucket')
end
end
end
118 changes: 83 additions & 35 deletions spec/atc/aws/remote_fixity_check_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,59 +45,103 @@
'object_size' => 123
}
end
let(:fixity_check_complete_message) do
let(:websocket_fixity_check_complete_message_content) do
{
'type' => 'fixity_check_complete',
'data' => successful_fixity_check_response_data
}
end
let(:websocket_fixity_check_complete_message) do
{
'identifier' => { 'channel' => 'FixityCheckChannel', 'job_identifier' => job_identifier }.to_json,
'message' => {
'type' => 'fixity_check_complete',
'data' => successful_fixity_check_response_data
}.to_json
'message' => websocket_fixity_check_complete_message_content.to_json
}
end

describe '#perform' do
let(:result) do
remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name, method
)
end

context 'with method argument of Atc::Aws::RemoteFixityCheck::WEBSOCKET' do
let(:method) { Atc::Aws::RemoteFixityCheck::WEBSOCKET }

it 'works as expected' do
allow(remote_fixity_check).to receive(:create_websocket_connection).and_return(mock_websocket)

job_response = nil
t = Thread.new do
job_response = remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name, method
)
end

# Wait a moment to allow the job in the other thread to start
sleep 2

# Manually trigger a message
mock_websocket.trigger(:message, OpenStruct.new(data: fixity_check_complete_message.to_json))

# Wait for the thread to finish
t.join
before do
allow(remote_fixity_check).to receive(:perform_websocket).with(
job_identifier, bucket_name, object_path, checksum_algorithm_name
).and_return(websocket_fixity_check_complete_message_content)
end

expect(job_response).to eq(JSON.parse(fixity_check_complete_message['message']))
it 'invokes the expected underlying methd and returns the expected result' do
expect(result).to eq(successful_fixity_check_response_data)
end
end

context 'with method argument of Atc::Aws::RemoteFixityCheck::HTTP' do
let(:method) { Atc::Aws::RemoteFixityCheck::HTTP }

before do
stub_request(:post, "#{check_please_app_base_http_url}/fixity_checks/run_fixity_check_for_s3_object").to_return(
body: successful_fixity_check_response_data.to_json
allow(remote_fixity_check).to receive(:perform_http).with(
bucket_name, object_path, checksum_algorithm_name
).and_return(successful_fixity_check_response_data)
end

it 'invokes the expected underlying methd and returns the expected result' do
expect(result).to eq(successful_fixity_check_response_data)
end
end
end

describe '#perform_websocket' do
it 'works as expected' do
allow(remote_fixity_check).to receive(:create_websocket_connection).and_return(mock_websocket)

result = nil
t = Thread.new do
result = remote_fixity_check.perform_websocket(
job_identifier, bucket_name, object_path, checksum_algorithm_name
)
end

it 'works as expected' do
job_response = remote_fixity_check.perform(
job_identifier, bucket_name, object_path, checksum_algorithm_name, method
# Wait a moment to allow the job in the other thread to start
sleep 2

# Manually trigger a message
mock_websocket.trigger(:message, OpenStruct.new(data: websocket_fixity_check_complete_message.to_json))

# Wait for the thread to finish
t.join

expect(result).to eq(websocket_fixity_check_complete_message_content)
end
end

describe '#perform_http' do
before do
stub_request(:post, "#{check_please_app_base_http_url}/fixity_checks/run_fixity_check_for_s3_object").to_return(
body: successful_fixity_check_response_data.to_json
)
end

it 'works as expected' do
expect(
remote_fixity_check.perform_http(
bucket_name, object_path, checksum_algorithm_name
)
).to eq(successful_fixity_check_response_data)
end

expect(job_response).to eq(successful_fixity_check_response_data)
end
it 'handles unexpected errors' do
allow(remote_fixity_check.http_client).to receive(:post).and_raise(StandardError, 'oh no!')
expect(remote_fixity_check.perform_http(bucket_name, object_path, checksum_algorithm_name)).to eq(
{
'checksum_hexdigest' => nil,
'object_size' => nil,
'error_message' => 'An unexpected error occurred: oh no!'
}
)
end
end

Expand Down Expand Up @@ -211,7 +255,7 @@
}
end
# fixity_check_complete messages are a type of custom message
let(:fixity_check_complete_message) do
let(:websocket_fixity_check_complete_message) do
{
'identifier' => {
'job_identifier' => job_identifier
Expand All @@ -238,7 +282,7 @@
describe '#custom_message?' do
it 'returns true when matching data is supplied' do
expect(remote_fixity_check.custom_message?(progress_message, job_identifier)).to eq(true)
expect(remote_fixity_check.custom_message?(fixity_check_complete_message, job_identifier)).to eq(true)
expect(remote_fixity_check.custom_message?(websocket_fixity_check_complete_message, job_identifier)).to eq(true)
expect(remote_fixity_check.custom_message?(fixity_check_error_message, job_identifier)).to eq(true)
end

Expand All @@ -256,14 +300,18 @@
end

it 'returns false when non-matching data is supplied' do
expect(remote_fixity_check.progress_message?(fixity_check_complete_message, job_identifier)).to eq(false)
expect(
remote_fixity_check.progress_message?(websocket_fixity_check_complete_message, job_identifier)
).to eq(false)
end
end

describe '#fixity_check_complete_or_error_message?' do
it 'returns true when matching data is supplied' do
expect(
remote_fixity_check.fixity_check_complete_or_error_message?(fixity_check_complete_message, job_identifier)
remote_fixity_check.fixity_check_complete_or_error_message?(
websocket_fixity_check_complete_message, job_identifier
)
).to eq(true)
expect(
remote_fixity_check.fixity_check_complete_or_error_message?(fixity_check_error_message, job_identifier)
Expand Down
15 changes: 15 additions & 0 deletions spec/jobs/verify_fixity_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@
verify_fixity_job.perform(gcp_stored_object.id)
end
end

it 'handles unexpected errors' do
allow(verify_fixity_job).to receive(:verify_fixity).and_raise(StandardError, 'oh no!')
expect(verify_fixity_job).to receive(:handle_unexpected_error)
verify_fixity_job.perform(aws_stored_object.id)
end
end

describe '#handle_unexpected_error' do
it 'updates the given FixityVerificationRecord as expected' do
verify_fixity_job.handle_unexpected_error(aws_fixity_verification_pending, StandardError.new('oh no!'))
expect(aws_fixity_verification_pending.status).to eq('failure')
expect(aws_fixity_verification_pending.error_message).to eq('An unexpected error occurred: oh no!')
expect(aws_fixity_verification_pending.changed?).to eq(false) # verify that the record has no unsaved changes
end
end

describe '#process_existing_fixity_verification_record' do
Expand Down

0 comments on commit 5298789

Please sign in to comment.