Skip to content

Commit

Permalink
fix normalization increasing state bug
Browse files Browse the repository at this point in the history
  • Loading branch information
geeli123 committed Nov 14, 2024
1 parent 44f90d0 commit 18a5c78
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
str(cur_processing.day),
)
LOGGER.info(f"Processing date: {date_partition}")
max_timestamp = cur_processing.timestamp()
max_epoch_timestamp = cur_processing.timestamp()

trip_updates_pa, vehicles_pa, alerts_pa = None, None, None

Expand All @@ -119,8 +119,8 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
key = obj["Key"]
if key.endswith(".binpb"):
# Check if this file is newer than the last processed file
file_write_time = float(key.split("/")[-1].removesuffix(".binpb"))
if file_write_time > cur_processing.timestamp():
file_write_epoch_time = float(key.split("/")[-1].removesuffix(".binpb"))
if file_write_epoch_time > cur_processing.timestamp():
LOGGER.info(f"Processing file: {key}")
# Download the file
response = s3.get_object(Bucket=source_bucket, Key=key)
Expand Down Expand Up @@ -150,33 +150,37 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
else cur_alerts_pa
)

max_timestamp = max(max_timestamp, file_write_time)
max_epoch_timestamp = max(max_epoch_timestamp, file_write_epoch_time)

if trip_updates_pa:
s3_uri = f"{destination_prefix}/trip-updates"
LOGGER.info(f"Writing {trip_updates_pa.num_rows} entries to {s3_uri}")
write_data(s3_fs, trip_updates_pa, s3_uri)
trip_updates_pa = None
if vehicles_pa:
s3_uri = f"{destination_prefix}/vehicles"
LOGGER.info(f"Writing {vehicles_pa.num_rows} entries to {s3_uri}")
write_data(s3_fs, vehicles_pa, s3_uri)
vehicles_pa = None
if alerts_pa:
s3_uri = f"{destination_prefix}/alerts"
LOGGER.info(f"Writing {alerts_pa.num_rows} entries to {s3_uri}")
write_data(s3_fs, alerts_pa, s3_uri)
alerts_pa = None

# Update the last processed timestamp
if max_timestamp == last_processed:
if max_epoch_timestamp == last_processed:
LOGGER.warning(
f"No data found in partition: {date_partition} "
f"- is this expected?"
)
LOGGER.info(
f"Updating last processed timestamp to "
f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_timestamp).isoformat()}"
f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}"
)
update_last_processed_timestamp(s3, state_bucket, state_key, max_timestamp)
update_last_processed_timestamp(s3, state_bucket, state_key, max_epoch_timestamp)
cur_processing = (cur_processing + dt.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
LOGGER.info(f"Processed all files up to {cur_time}")


@click.command()
Expand Down

0 comments on commit 18a5c78

Please sign in to comment.