Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed Dec 6, 2023
1 parent 36509ff commit 6c64e56
Showing 1 changed file with 57 additions and 53 deletions.
110 changes: 57 additions & 53 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,56 +428,59 @@ def migrate_bundle(self, bundle_uuid):
bundle_location = self.get_bundle_location(bundle_uuid)

# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
try:
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
except Exception as e:
if "Path ''" in str(e):
pass
is_bundle_rm = True
else:
raise e

is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED:
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE

siz = path_util.get_path_size(bundle_location)
if siz > 5e8:
self.logger.info("Skipping bundle %s with size %s", bundle_uuid, siz)
return

# Upload to Azure.
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE

# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED:
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE

siz = path_util.get_path_size(bundle_location)
if siz > 5e8:
self.logger.info("Skipping bundle %s with size %s", bundle_uuid, siz)
return

# Upload to Azure.
if not is_bundle_rm and not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if self.change_db and not bundle_migration_status.changed_db():
Expand All @@ -487,15 +490,16 @@ def migrate_bundle(self, bundle_uuid):
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB

# Delete the bundle from disk.
if self.delete:
self.logger.info("Deleting from disk")
start_time = time.time()
if os.path.lexists(disk_location):
# Delete it.
path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED
if not is_bundle_rm:
# Delete the bundle from disk.
if self.delete:
self.logger.info("Deleting from disk")
start_time = time.time()
if os.path.lexists(disk_location):
# Delete it.
path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED

self.times["migrate_bundle"].append(time.time() - total_start_time)

Expand Down

0 comments on commit 6c64e56

Please sign in to comment.