Skip to content

Commit

Permalink
AIP-loading rake task plus supporting code
Browse files Browse the repository at this point in the history
  • Loading branch information
elohanlon committed Nov 3, 2024
1 parent 94b55e7 commit ba9b3e9
Show file tree
Hide file tree
Showing 28 changed files with 2,413 additions and 29 deletions.
11 changes: 11 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@ AllCops:
- 'lib/tasks/**/*'
- 'tmp/**/*'

Layout/LineLength:
Exclude:
- lib/atc/aip_reader.rb
- spec/atc/aip_reader_spec.rb

Metrics/AbcSize:
Exclude:
- lib/atc/aip_reader.rb

Metrics/ClassLength:
Exclude:
- 'lib/atc/aws/remote_fixity_check.rb'

Metrics/MethodLength:
Exclude:
- lib/atc/aip_reader.rb
- 'lib/atc/loaders/checksum_loader.rb'
- 'lib/atc/utils/aws_multipart_checksum_utils.rb'
- 'lib/atc/aws/s3_uploader.rb'
Expand All @@ -31,6 +41,7 @@ Metrics/MethodLength:
Rails/Output:
Exclude:
- 'lib/atc/aws/s3_uploader.rb'
- 'lib/atc/aip_reader.rb'

Lint/MissingCopEnableDirective:
Enabled: false
Expand Down
5 changes: 4 additions & 1 deletion app/jobs/perform_transfer_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def perform(pending_transfer_id)

# If we got here, that means that the upload was successful. We can convert this
# PendingTransfer record into a StoredObject record.
StoredObject.create!(
stored_object = StoredObject.create!(
path: previously_attempted_stored_paths.last,
source_object: pending_transfer.source_object,
storage_provider: pending_transfer.storage_provider,
Expand All @@ -92,6 +92,9 @@ def perform(pending_transfer_id)

# And then delete the PendingTransfer record because it's no longer needed:
pending_transfer.destroy

# And finally, queue a fixity check job for the successfully transferred object
VerifyFixityJob.perform_later(stored_object.id)
rescue StandardError => e
unless e.is_a?(ActiveRecord::RecordNotFound)
# If an unexpected error occurs, capture it and mark this job as a failure.
Expand Down
3 changes: 2 additions & 1 deletion app/jobs/verify_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def perform(stored_object_id)
def handle_unexpected_error(fixity_verification_record, err)
fixity_verification_record.update!(
status: :failure,
error_message: "An unexpected error occurred: #{err.class.name} -> #{err.message}"
error_message: "An unexpected error occurred: #{err.class.name} -> "\
"#{err.message}\n\t#{err.backtrace.join("\n\t")}"
)
end

Expand Down
2 changes: 2 additions & 0 deletions app/models/source_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def self.for_path(local_file_path)
def storage_providers_for_source_path
@storage_providers_for_source_path ||= begin
storage_providers = []

ATC[:source_paths_to_storage_providers]&.each do |path_prefix, config|
next unless self.path.start_with?(path_prefix.to_s)

Expand All @@ -45,6 +46,7 @@ def storage_providers_for_source_path
storage_providers << storage_provider unless storage_provider.nil?
end
end

# If this method is being called, we expect there to be a storage provider for this source_object's path.
# So if no storage_providers were found, raise an exception.
if storage_providers.empty?
Expand Down
157 changes: 157 additions & 0 deletions lib/atc/aip_reader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# frozen_string_literal: true

class Atc::AipReader
SUPPORTED_CHECKSUM_ALGORITHMS_IN_ORDER_OF_PREFERENCE = ['sha256', 'sha512', 'md5'].freeze

attr_reader :path, :manifest_file_path, :tagmanifest_file_path, :checksum_type, :file_path_to_checksum_map

def initialize(aip_path, verbose: false)
@verbose = verbose
@path = aip_path
@file_list = self.class.generate_file_list(self.path)
@manifest_file_path, @tagmanifest_file_path = select_best_manifest_files
validate!
@checksum_type = @manifest_file_path.match(/.+-(.+).txt/)[1]
generate_file_path_to_checksum_map!
ensure_file_list_checksum_coverage!
end

# Iterates over each file in this AIP, yielding the full file path and checksum.
# @yield [file_path, checksum]
def each_file_with_checksum(&block)
file_path_to_checksum_map.each(&block)
end

# @visibility private
# Iterates over all files in the AIP (regardless of whether they appear in the manifest or tagmanifest files)
# and returns an array with all paths. If any unreadable files are encountered, an error is raised with a message
# that details which files could not be read.
def self.generate_file_list(directory_path)
readable_files = []
unreadable_files = []
counter = 0
print 'Generating file list (0)...' if @verbose
Atc::Utils::FileUtils.stream_recursive_directory_read(directory_path) do |file_path|
if File.readable?(file_path)
readable_files << file_path
else
unreadable_files << file_path
end
print "\rGenerating file list (#{counter += 1})..." if @verbose
end
puts '' if @verbose

if unreadable_files.length.positive?
raise Atc::Exceptions::UnreadableAip,
"The following files could not be read:\n#{unreadable_files.sort.join("\n")}"
end

readable_files
end

# @visibility private
# This method ensures that all files in @file_list have a corresponding checksum and raises an error if any files
# do not have a checksum.
def ensure_file_list_checksum_coverage!
files_without_checksums = []
@file_list.each do |file_path|
files_without_checksums << file_path unless file_path_to_checksum_map.key?(file_path)
end

return if files_without_checksums.empty?

raise Atc::Exceptions::MissingAipChecksums,
"The following files did not have associated checksums in the manifest or tagmanifest files:\n#{files_without_checksums.sort.join("\n")}"
end

# @visibility private
# Generates the file path to checksum map for this AIP and assigns it to @file_path_to_checksum_map.
# This method uses the manifest and tagmanifest files to generate a mapping of AIP files to associated checksums.
# A checksum is also dynamically generated for the tagmanifest file because its checksum would not appear in the
# manifest or tagmanifest files.
def generate_file_path_to_checksum_map!
file_paths_to_checksums = {}
counter = 0
print 'Generating AIP checksum mapping (0)...' if @verbose
[self.tagmanifest_file_path, self.manifest_file_path].each do |checksum_source_file|
File.foreach(checksum_source_file) do |line|
checksum, aip_relative_path = line.strip.split(' ', 2)
file_paths_to_checksums[File.join(self.path, aip_relative_path)] = checksum
print "\rGenerating AIP checksum mapping (#{counter += 1})..." if @verbose
end
end
puts '' if @verbose

# And we'll manually generate a checksum for the tagmanifest file, since it doesn't contain its own checksum
file_paths_to_checksums[self.tagmanifest_file_path] =
"Digest::#{self.checksum_type.upcase}".constantize.file(self.tagmanifest_file_path).hexdigest

@file_path_to_checksum_map = file_paths_to_checksums
end

# @visibility private
# Returns true if this is a valid AIP.
def validate!
if self.manifest_file_path.nil?
raise Atc::Exceptions::InvalidAip,
'Could not find supported manifest file (need sha256, sha512, or md5).'
end

if self.tagmanifest_file_path.nil?
raise Atc::Exceptions::InvalidAip,
"Could not find tagmanifest file with checksum algorithm matching manifest file: #{tagmanifest_file_path}"
end

validate_aip_content_glob_patterns!
end

# @visibility private
# Checks to see if this AIP has the minimum set of expected files and subdirectories.
def validate_aip_content_glob_patterns!
glob_patterns_to_check = ['data', 'bagit.txt', 'bag-info.txt', 'manifest-*.txt', 'tagmanifest-*.txt'].map do |val|
File.join(self.path, val)
end

missing = glob_patterns_to_check.select { |expected_file_or_directory| Dir.glob(expected_file_or_directory).blank? }

return if missing.empty?

raise Atc::Exceptions::InvalidAip,
"The following expected files/directories are missing from this AIP: #{missing.sort.join("\n")}"
end

# @visibility private
# Selects the best manifest files available, preferring sha256 first, then sha512, and then md5.
# Other checksum algorithms are not supported at this time and will be ignored.
# @return [Array] An array of two elements: the first is the manifest file path and the second is a tagmanifest path.
# If no supported-algorithm manifest file is found, the first element will be nil. The tagmanifest
# path will be for a file that matches the checksum algorithm of the manifest file, or will be nil
# if a matching checksum algorithm file cannot be found.(if a supported manifest file is found)
def select_best_manifest_files
manifest_and_tagmanifest_paths = [nil, nil]

manifest_algorithm = SUPPORTED_CHECKSUM_ALGORITHMS_IN_ORDER_OF_PREFERENCE.find do |checksum_algorithm|
next File.exist?(manifest_path_for_checksum_algorithm(checksum_algorithm))
end

return manifest_and_tagmanifest_paths if manifest_algorithm.nil?

manifest_and_tagmanifest_paths[0] = manifest_path_for_checksum_algorithm(manifest_algorithm)
possible_tagmanifest_path = tagmanifest_path_for_checksum_algorithm(manifest_algorithm)
manifest_and_tagmanifest_paths[1] = possible_tagmanifest_path if File.exist?(possible_tagmanifest_path)

manifest_and_tagmanifest_paths
end

# @visibility private
# Generates the full path to the manifest file for the given checksum algorithm.
def manifest_path_for_checksum_algorithm(checksum_algorithm)
File.join(self.path, "manifest-#{checksum_algorithm}.txt")
end

# @visibility private
# Generates the full path to the tagmanifest file for the given checksum algorithm.
def tagmanifest_path_for_checksum_algorithm(checksum_algorithm)
File.join(self.path, "tagmanifest-#{checksum_algorithm}.txt")
end
end
10 changes: 9 additions & 1 deletion lib/atc/aws/fixity_check.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

class Atc::Aws::FixityCheck
HTTP_POLLING_THRESHOLD = 2.gigabytes

def initialize(stored_object, fixity_check_identifier)
@bucket_name = stored_object.storage_provider.container_name
@object_path = stored_object.path
Expand All @@ -21,8 +23,14 @@ def fixity_checksum_object_size
@object_path, @fixity_checksum_algorithm.name.downcase,
# A synchronous HTTP check is faster than polling for smaller files,
# so we'll only use the polling method for larger files.
@expected_object_size < 1.gigabyte ? Atc::Aws::RemoteFixityCheck::HTTP : Atc::Aws::RemoteFixityCheck::HTTP_POLLING
self.remote_fixity_check_method
)
[response['checksum_hexdigest'], response['object_size'], response['error_message']]
end

def remote_fixity_check_method
return Atc::Aws::RemoteFixityCheck::HTTP if @expected_object_size < HTTP_POLLING_THRESHOLD

Atc::Aws::RemoteFixityCheck::HTTP_POLLING
end
end
27 changes: 13 additions & 14 deletions lib/atc/aws/remote_fixity_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
class Atc::Aws::RemoteFixityCheck
# This value shouldn't be too high because we want to detect stalled jobs fairly soon after they stall,
# but it should be high enough to account for downtime/delays related to CheckPlease app deployments.
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 1.minute
STALLED_FIXITY_CHECK_JOB_TIMEOUT = 2.minutes
POLLING_DELAY = 2.seconds
MAX_WAIT_TIME_FOR_POLLING_JOB_START = 1.hour
WEBSOCKET = 'websocket'
HTTP = 'http'
HTTP_POLLING = 'http_polling'
Expand All @@ -25,7 +24,6 @@ def initialize(http_base_url, ws_url, auth_token)
def http_client
@http_client ||= ::Faraday.new(url: @http_base_url, request: { timeout: CHECK_PLEASE['http_timeout'] }) do |f|
f.request :authorization, 'Bearer', @auth_token
f.response :json # decode response bodies as JSON
f.response :raise_error # raise 4xx and 5xx responses as errors
f.adapter :net_http # Use the Net::HTTP adapter
end
Expand Down Expand Up @@ -61,15 +59,14 @@ def perform_http(bucket_name, object_path, checksum_algorithm_name)
'checksum_algorithm_name' => checksum_algorithm_name
}
}.to_json
response = http_client.post('/fixity_checks/run_fixity_check_for_s3_object', payload) do |request|
request.headers['Content-Type'] = 'application/json'
end

JSON.parse(response.body)
JSON.parse(http_client.post('/fixity_checks/run_fixity_check_for_s3_object', payload) { |request|
request.headers['Content-Type'] = 'application/json'
}.body)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}\n\t#{e.backtrace.join("\n\t")}"
}
end

Expand All @@ -82,9 +79,9 @@ def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
}
}.to_json

fixity_check_create_response = http_client.post('/fixity_checks', payload) { |request|
fixity_check_create_response = JSON.parse(http_client.post('/fixity_checks', payload) { |request|
request.headers['Content-Type'] = 'application/json'
}.body
}.body)

if fixity_check_create_response['error_message'].present?
# Raise any unexpected error message. It will be handled elsewhere.
Expand All @@ -99,9 +96,11 @@ def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
loop do
sleep POLLING_DELAY

fixity_check_response = http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request|
request.headers['Content-Type'] = 'application/json'
}.body
fixity_check_response = JSON.parse(
http_client.get("/fixity_checks/#{fixity_check_create_response['id']}") { |request|
request.headers['Content-Type'] = 'application/json'
}.body
)

status = fixity_check_response['status']

Expand All @@ -128,7 +127,7 @@ def perform_http_polling(bucket_name, object_path, checksum_algorithm_name)
rescue StandardError => e
{
'checksum_hexdigest' => nil, 'object_size' => nil,
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}"
'error_message' => "An unexpected error occurred: #{e.class.name} -> #{e.message}\n\t#{e.backtrace.join("\n\t")}"
}
end

Expand Down
5 changes: 5 additions & 0 deletions lib/atc/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ class ObjectExists < AtcError; end
class StorageProviderMappingNotFound < AtcError; end
class RemoteFixityCheckTimeout < AtcError; end
class PollingWaitTimeoutError < AtcError; end

class AipLoadError < AtcError; end
class InvalidAip < AipLoadError; end
class UnreadableAip < AipLoadError; end
class MissingAipChecksums < AipLoadError; end
end
46 changes: 46 additions & 0 deletions lib/tasks/atc/aip.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
namespace :atc do
namespace :aip do
desc 'Load files from an AIP into ATC, load checksums from the AIP manifest, and initiate transfer and verification processes.'
task load: :environment do
aip_path = ENV['path']
dry_run = ENV['dry_run'] == 'true'

aip_reader = Atc::AipReader.new(aip_path, verbose: true)

# Identify checksum type for this AIP (sha256, sha512, or md5) and retrieve the associated ChecksumAlgorithm object
fixity_checksum_algorithm = ChecksumAlgorithm.find_by(name: aip_reader.checksum_type.upcase)

# Iterate over files and load them into the ATC database (skipping files that already exist)
source_object_counter = 0
previously_inventoried_file_counter = 0

print_inventory_addition_progress(source_object_counter, previously_inventoried_file_counter, dry_run)
aip_reader.each_file_with_checksum do |file_path, hex_checksum|
unless dry_run
size = File.size(file_path)
source_object = SourceObject.create!(
path: file_path,
object_size: size,
fixity_checksum_algorithm: fixity_checksum_algorithm,
fixity_checksum_value: Atc::Utils::HexUtils.hex_to_bin(hex_checksum)
)
PrepareTransferJob.perform_later(source_object.id, enqueue_successor: true)
end
source_object_counter += 1
if source_object_counter % 1 == 0
print_inventory_addition_progress(source_object_counter, previously_inventoried_file_counter, dry_run)
end
rescue ActiveRecord::RecordNotUnique => e
# Skipping file because it was previously added to the inventory
previously_inventoried_file_counter += 1
if previously_inventoried_file_counter % 1 == 0
print_inventory_addition_progress(source_object_counter, previously_inventoried_file_counter, dry_run)
end
end
print_inventory_addition_progress(source_object_counter, previously_inventoried_file_counter, dry_run)
puts "\nDone!"
rescue Atc::Exceptions::AipLoadError => e
puts "An error has occurred (#{e.class.name}):\n" + Rainbow(e.message).red
end
end
end
Loading

0 comments on commit ba9b3e9

Please sign in to comment.