Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up output dir validation #67

Merged
merged 3 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,12 @@ def find_missing_tasks(self, expected):
fs = self.get_fs()
done_tasks = set()

for f in fs.ls(f"{self.results_dir}/simulation_output/"):
if m := re.match(".*results_job(\\d*).json.gz$", f):
done_tasks.add(int(m.group(1)))
try:
for f in fs.ls(f"{self.results_dir}/simulation_output/"):
if m := re.match(".*results_job(\\d*).json.gz$", f):
done_tasks.add(int(m.group(1)))
except FileNotFoundError:
logger.warning("No completed task files found. Running all tasks.")

missing_tasks = []
with fs.open(f"{self.results_dir}/missing_tasks.txt", "w") as f:
Expand Down
99 changes: 54 additions & 45 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,56 @@ def get_AR_repo_name(gcp_project, region, repo):
"""Returns the full name of a repository in Artifact Registry."""
return f"projects/{gcp_project}/locations/{region}/repositories/{repo}"

def check_output_dir(self):
"""Check for existing results files in the output directory."""
storage_client = storage.Client(project=self.gcp_project)
output_dir = os.path.join(self.cfg["gcp"]["gcs"]["prefix"], "results", "simulation_output")
bucket = storage_client.bucket(self.gcs_bucket)
blobs = bucket.list_blobs(prefix=os.path.join(output_dir, "results_job"))

try:
blob = next(blobs)
except StopIteration:
return

prefix_for_deletion = self.cfg["gcp"]["gcs"]["prefix"]
blobs_for_deletion = bucket.list_blobs(prefix=prefix_for_deletion)
user_choice = (
input(
f"Output files are already present in bucket {self.gcs_bucket}! For example, {blob.name} exists. "
f"Do you want to permanently delete all the files in {prefix_for_deletion}? (yes/no): "
)
.strip()
.lower()
)

if user_choice == "yes":
logger.info("Deleting files...")
try:
blobs_for_deletion_object = [blob for blob in blobs_for_deletion]
bucket.delete_blobs(blobs_for_deletion_object)
except Exception as e:
logger.error(f"Failed to delete files: {e}")

# Confirm deletion
remaining_blobs = list(bucket.list_blobs(prefix=prefix_for_deletion))
if not remaining_blobs:
logger.info("All specified files have been confirmed deleted. You can now proceed with your operation.")
else:
logger.warning(
"Deletion confirmed for some files, but some still remain. Please check GCS for details."
)
elif user_choice == "no":
raise ValidationError(
f"Output files are already present in bucket {self.gcs_bucket}! For example, {blob.name} exists. "
"If you do not wish to delete them choose a different file prefix. "
f"https://console.cloud.google.com/storage/browser/{self.gcs_bucket}/{prefix_for_deletion}"
)
else:
raise ValidationError("Invalid input!")

@staticmethod
def validate_gcp_args(project_file, missing_only):
def validate_gcp_args(project_file):
cfg = get_project_configuration(project_file)
assert "gcp" in cfg, 'Project config must contain a "gcp" section'
gcp_project = cfg["gcp"]["project"]
Expand All @@ -209,50 +257,9 @@ def validate_gcp_args(project_file, missing_only):

# Check that GCP bucket exists
bucket = cfg["gcp"]["gcs"]["bucket"]
output_dir = os.path.join(cfg["gcp"]["gcs"]["prefix"], "results", "simulation_output")
storage_client = storage.Client(project=gcp_project)
assert storage_client.bucket(bucket).exists(), f"GCS bucket {bucket} does not exist in project {gcp_project}"

blobs_exist = False
blobs = storage_client.bucket(bucket).list_blobs(prefix=os.path.join(output_dir, "results_job"))

for blob in blobs:
blobs_exist = True
break

if not missing_only:
if blobs_exist:
prefix_for_deletion = cfg["gcp"]["gcs"]["prefix"]
blobs_for_deletion = storage_client.bucket(bucket).list_blobs(prefix=prefix_for_deletion)

user_choice = input(f"Output files are already present in bucket {bucket}! For example, {blob.name} exists. "
f"Do you want to delete all the files in {prefix_for_deletion}? (yes/no): ").strip().lower()

if user_choice == "yes":
try:
blobs_for_deletion_object = [blob for blob in blobs_for_deletion]
storage_client.bucket(bucket).delete_blobs(blobs_for_deletion_object)
except Exception as e:
print(f"Failed to delete files: {e}")

# Confirm deletion
remaining_blobs = list(storage_client.bucket(bucket).list_blobs(prefix=prefix_for_deletion))
if not remaining_blobs:
print("All specified files have been confirmed deleted. You can now proceed with your operation.")
else:
print(f"Deletion confirmed for some files, but some still remain. Please check GCS for details.")

elif user_choice == "no":
raise ValidationError(
f"Output files are already present in bucket {bucket}! For example, {blob.name} exists. "
"If you do not wish to delete them choose a different file prefix. "
f"https://console.cloud.google.com/storage/browser/{bucket}/{prefix_for_deletion}"
)

else:
raise ValidationError(
f"Invalid input!")

# Check that artifact registry repository exists
repo = cfg["gcp"]["artifact_registry"]["repository"]
ar_client = artifactregistry_v1.ArtifactRegistryClient()
Expand Down Expand Up @@ -291,9 +298,9 @@ def validate_gcp_args(project_file, missing_only):
)

@staticmethod
def validate_project(project_file, missing_only=False):
def validate_project(project_file):
super(GcpBatch, GcpBatch).validate_project(project_file)
GcpBatch.validate_gcp_args(project_file,missing_only)
GcpBatch.validate_gcp_args(project_file)

@property
def docker_image(self):
Expand Down Expand Up @@ -1226,7 +1233,7 @@ def main():
logger.setLevel(logging.INFO)

# validate the project, and if --validateonly flag is set, return True if validation passes
GcpBatch.validate_project(os.path.abspath(args.project_filename), args.missingonly)
GcpBatch.validate_project(os.path.abspath(args.project_filename))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add args.missingonly here and avoid checking for that in the check_output_dir?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is what you meant, but I moved the missingonly check out of check_output_dir and put it below.

if args.validateonly:
return True

Expand All @@ -1246,6 +1253,8 @@ def main():
else:
if batch.check_for_existing_jobs():
return
if not args.missingonly:
batch.check_output_dir()

batch.build_image()
batch.push_image()
Expand Down
Loading