Skip to content

Commit

Permalink
Catch single daily/monthly failures and continue conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
stijnvanhoey committed Aug 21, 2023
1 parent 4517a47 commit f56cbdd
Showing 1 changed file with 77 additions and 69 deletions.
146 changes: 77 additions & 69 deletions src/vptstools/bin/vph5_to_vpts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from vptstools.vpts import vpts, vpts_to_csv
from vptstools.s3 import handle_manifest, OdimFilePath
from vptstools.bin.click_exception import catch_all_exceptions, report_exception_to_sns
from vptstools.bin.click_exception import catch_all_exceptions, report_click_exception_to_sns

# Load environmental variables from file in dev
# (load_dotenv doesn't override existing environment variables)
Expand All @@ -31,15 +31,15 @@


# Prepare SNS report handler
report_sns = partial(report_exception_to_sns,
aws_sns_topic=AWS_SNS_TOPIC,
subject="Conversion from hdf5 files to daily/monthly vpts-files failed.",
profile_name=AWS_PROFILE,
region_name=AWS_REGION
)
sns_report_exception = partial(report_click_exception_to_sns,
aws_sns_topic=AWS_SNS_TOPIC,
subject="Conversion from hdf5 files to daily/monthly vpts-files failed.",
profile_name=AWS_PROFILE,
region_name=AWS_REGION
)


@click.command(cls=catch_all_exceptions(click.Command, handler=report_sns)) # Add SNS-reporting on exception
@click.command(cls=catch_all_exceptions(click.Command, handler=sns_report_exception)) # Add SNS-reporting on exception
@click.option(
"--modified-days-ago",
"modified_days_ago",
Expand Down Expand Up @@ -101,42 +101,46 @@ def cli(modified_days_ago):

click.echo(f"Create {days_to_create_vpts.shape[0]} daily vpts files.")
for j, daily_vpts in enumerate(days_to_create_vpts["directory"]):
# Enlist files of the day to rerun (all the given day)
source, _, radar_code, year, month, day = daily_vpts
odim_path = OdimFilePath(source, radar_code, "vp", year, month, day)
odim5_files = inbo_s3.ls(f"{S3_BUCKET}/{odim_path.s3_folder_path_h5}")
click.echo(f"Create daily vpts file {odim_path.s3_file_path_daily_vpts}.")
# - create tempdir
temp_folder_path = Path(tempfile.mkdtemp())

# - download the files of the day
h5_file_local_paths = []
for i, file_key in enumerate(odim5_files):
h5_path = OdimFilePath.from_s3fs_enlisting(file_key)
h5_local_path = str(temp_folder_path / h5_path.file_name)
# inbo_s3.get_file(file_key, h5_local_path)
# s3f3 failes in wrapped moto environment; fall back to boto3
s3_client.download_file(
S3_BUCKET,
f"{h5_path.s3_folder_path_h5}/{h5_path.file_name}",
h5_local_path,
)
h5_file_local_paths.append(h5_local_path)
try:
# Enlist files of the day to rerun (all the given day)
source, _, radar_code, year, month, day = daily_vpts
odim_path = OdimFilePath(source, radar_code, "vp", year, month, day)
odim5_files = inbo_s3.ls(f"{S3_BUCKET}/{odim_path.s3_folder_path_h5}")
click.echo(f"Create daily vpts file {odim_path.s3_file_path_daily_vpts}.")
# - create tempdir
temp_folder_path = Path(tempfile.mkdtemp())

# - download the files of the day
h5_file_local_paths = []
for i, file_key in enumerate(odim5_files):
h5_path = OdimFilePath.from_s3fs_enlisting(file_key)
h5_local_path = str(temp_folder_path / h5_path.file_name)
# inbo_s3.get_file(file_key, h5_local_path)
# s3f3 failes in wrapped moto environment; fall back to boto3
s3_client.download_file(
S3_BUCKET,
f"{h5_path.s3_folder_path_h5}/{h5_path.file_name}",
h5_local_path,
)
h5_file_local_paths.append(h5_local_path)

# - run vpts on all locally downloaded files
df_vpts = vpts(h5_file_local_paths)
# - run vpts on all locally downloaded files
df_vpts = vpts(h5_file_local_paths)

# - save vpts file locally
vpts_to_csv(df_vpts, temp_folder_path / odim_path.daily_vpts_file_name)
# - save vpts file locally
vpts_to_csv(df_vpts, temp_folder_path / odim_path.daily_vpts_file_name)

# - copy vpts file to S3
inbo_s3.put(
str(temp_folder_path / odim_path.daily_vpts_file_name),
f"{S3_BUCKET}/{odim_path.s3_file_path_daily_vpts}",
)
# - copy vpts file to S3
inbo_s3.put(
str(temp_folder_path / odim_path.daily_vpts_file_name),
f"{S3_BUCKET}/{odim_path.s3_file_path_daily_vpts}",
)

# - remove tempdir with local files
shutil.rmtree(temp_folder_path)
# - remove tempdir with local files
shutil.rmtree(temp_folder_path)
except Exception as exc:
click.echo(f"During conversion from h5 files of {source}/{radar_code} at "
f"{year}-{month}-{day} to daily vpts file, the following error occurred: {exc}.")

click.echo("Finished creating daily vpts files.")

Expand All @@ -152,35 +156,39 @@ def cli(modified_days_ago):

click.echo(f"Create {months_to_create_vpts.shape[0]} monthly vpts files.")
for j, monthly_vpts in enumerate(months_to_create_vpts["directory"]):
source, _, radar_code, year, month = monthly_vpts
odim_path = OdimFilePath(source, radar_code, "vp", year, month, "01")

click.echo(f"Create monthly vpts file {odim_path.s3_file_path_monthly_vpts}.")
file_list = inbo_s3.ls(f"{S3_BUCKET}/{odim_path.s3_path_setup('daily')}")
files_to_concat = sorted(
[
daily_vpts
for daily_vpts in file_list
if daily_vpts.find(f"{odim_path.year}{odim_path.month}") >= 0
]
)
# do not parse Nan values, but keep all data as string
df_month = pd.concat(
[
pd.read_csv(
f"s3://{file_path}",
dtype=str,
keep_default_na=False,
na_values=None,
)
for file_path in files_to_concat
]
)
df_month.to_csv(
f"s3://{S3_BUCKET}/{odim_path.s3_file_path_monthly_vpts}",
index=False,
storage_options=storage_options,
)
try:
source, _, radar_code, year, month = monthly_vpts
odim_path = OdimFilePath(source, radar_code, "vp", year, month, "01")

click.echo(f"Create monthly vpts file {odim_path.s3_file_path_monthly_vpts}.")
file_list = inbo_s3.ls(f"{S3_BUCKET}/{odim_path.s3_path_setup('daily')}")
files_to_concat = sorted(
[
daily_vpts
for daily_vpts in file_list
if daily_vpts.find(f"{odim_path.year}{odim_path.month}") >= 0
]
)
# do not parse Nan values, but keep all data as string
df_month = pd.concat(
[
pd.read_csv(
f"s3://{file_path}",
dtype=str,
keep_default_na=False,
na_values=None,
)
for file_path in files_to_concat
]
)
df_month.to_csv(
f"s3://{S3_BUCKET}/{odim_path.s3_file_path_monthly_vpts}",
index=False,
storage_options=storage_options,
)
except Exception as exc:
click.echo(f"During conversion from h5 files of {source}/{radar_code} at "
f"{year}-{month}-{day} to monthly vpts file, the following error occurred: {exc}.")

click.echo("Finished creating monthly vpts files.")
click.echo("Finished vpts update procedure.")
Expand Down

0 comments on commit f56cbdd

Please sign in to comment.