Skip to content

Commit

Permalink
rework constants to sustain version changes, update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
torimcd committed Oct 31, 2024
1 parent ccb68d4 commit d374573
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 222 deletions.
26 changes: 10 additions & 16 deletions hydrocron/db/io/swot_shp.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
np.random.default_rng().integers(low=2, high=10)*shp_file[numeric_columns],
shp_file[numeric_columns])

filename_attrs = parse_from_filename(filename)
filename_attrs = parse_from_filename(filepath)

xml_attrs = parse_metadata_from_shpxml(shp_xml_tree)

Expand Down Expand Up @@ -204,38 +204,32 @@ def assemble_attributes(geodf, attributes):
return items


def parse_from_filename(filename):
def parse_from_filename(filepath):
"""
Parses the cycle, pass, start and end time from
the shapefile name and add to each item
Parameters
----------
filename : string
The string to parse
filepath : string
The full uri of the granule to parse
Returns
-------
filename_attrs : dict
A dictionary of attributes from the filename
"""
logging.info('Starting parse attributes from filename')

filename = os.path.basename(filepath)
filename_components = filename.split("_")

collection = ""
collection_version = ""

if 'RiverSP_Reach' in filename:
collection = constants.SWOT_REACH_COLLECTION_NAME
collection_version = constants.SWOT_REACH_COLLECTION_VERSION

if 'RiverSP_Node' in filename:
collection = constants.SWOT_NODE_COLLECTION_NAME
collection_version = constants.SWOT_NODE_COLLECTION_VERSION

if 'LakeSP_Prior' in filename:
collection = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
collection_version = constants.SWOT_PRIOR_LAKE_COLLECTION_VERSION
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['feature_type'] in filename) & (table_info['collection_name'] in filepath):
collection = table_info['collection_name']

filename_attrs = {
'cycle_id': filename_components[5],
Expand Down Expand Up @@ -283,7 +277,7 @@ def load_benchmarking_data():
'continent_id': 'XX',
'range_end_time': '2024-12-31T23:59:00Z',
'crid': 'TEST',
'collection_shortname': constants.SWOT_REACH_COLLECTION_NAME
'collection_shortname': constants.TABLE_COLLECTION_INFO[0]['collection_name']
}

items = assemble_attributes(csv_file, filename_attrs)
Expand Down
143 changes: 35 additions & 108 deletions hydrocron/db/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613
end_date = event['body']['end_date']
load_benchmarking_data = event['body']['load_benchmarking_data']

collection_shortname, track_table, feature_type, _ = get_collection_info_by_table(table_name)
for table_info in constants.TABLE_COLLECTION_INFO:
if table_info['table_name'] in table_name:
collection_shortname = table_info['collection_name']
track_table = table_info['track_table']
feature_type = table_info['feature_type']
break
else:
raise MissingTable(f"Error: Table does not exist: {table_name}")

logging.info("Searching for granules in collection %s", collection_shortname)

new_granules = find_new_granules(
Expand Down Expand Up @@ -88,8 +96,6 @@ def granule_handler(event, _):
Second Lambda entrypoint for loading individual granules
"""
granule_path = event['body']['granule_path']
table_name = event['body']['table_name']
track_table = event['body']['track_table']

load_benchmarking_data = event['body']['load_benchmarking_data']

Expand All @@ -105,17 +111,16 @@ def granule_handler(event, _):
revision_date = "Not Found"
logging.info('No CNM revision date')

if ("Reach" in granule_path) & (table_name != constants.SWOT_REACH_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Reach data into table: '{table_name}'")

if ("Node" in granule_path) & (table_name != constants.SWOT_NODE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Node data into table: '{table_name}'")

if ("LakeSP_Prior" in granule_path) & (table_name != constants.SWOT_PRIOR_LAKE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Prior Lake data into table: '{table_name}'")

if ("LakeSP_Obs" in granule_path) | ("LakeSP_Unassigned" in granule_path):
raise TableMisMatch(f"Error: Cannot load Observed or Unassigned Lake data into table: '{table_name}'")
raise MissingTable("Error: Cannot load Observed or Unassigned Lake data")

for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in granule_path) & (table_info['feature_type'] in granule_path):
table_name = table_info['table']
track_table = table_info['track_table']
break
else:
raise MissingTable(f"Error: Cannot load granule: {granule_path}, no support for this collection")

logging.info("Value of load_benchmarking_data is: %s", load_benchmarking_data)

Expand Down Expand Up @@ -170,7 +175,13 @@ def cnm_handler(event, _):
granule_uri = files['uri']
checksum = files['checksum']

table_name, track_table = get_table_info_by_granule(granule_uri)
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in granule_uri) & (table_info['feature_type'] in granule_uri):
table_name = table_info['table_name']
track_table = table_info['track_table']
break
else:
raise MissingTable(f"Error: Cannot load granule: {granule_uri}")

event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + table_name
Expand All @@ -187,86 +198,6 @@ def cnm_handler(event, _):
Payload=event2)


def get_collection_info_by_table(table_name):
"""
Returns the collection name, feature type, track ingest table name, and feature id
for the given table
Parameters
----------
table_name : string
the name of the hydrocron db table
Returns
-------
collection_shortname : string
the collection shortname
track_table : string
the track ingest table associated with the feature table
feature_type : string
the type of feature in the table
feature_id : string
the feature id field for the feature type in the table
"""
collection_shortname = ''
table_name = ''
feature_type = ''

try:
row = constants.TABLE_COLLECTION_INFO[constants.TABLE_COLLECTION_INFO['table_name'] == table_name]
collection_shortname = row['collection_name']
track_table = row['track_table']
feature_type = row['feature_type']
feature_id = row['feature_id']
except MissingTable as exc:
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from exc

return collection_shortname, track_table, feature_type, feature_id


def get_table_info_by_granule(granule_uri):
"""
Returns the table name and type, track ingest table name for
the given granule
Parameters
----------
granule_uri : string
the uri to the granule being proccessed
Returns
-------
table_name : string
the hydrocron feature table name
track_table : string
the track ingest table associated with the feature table
"""
collection_shortnames = constants.TABLE_COLLECTION_INFO['collection_name']
feature_types = constants.TABLE_COLLECTION_INFO['feature_type']
table_name = ''
track_table = ''

for shortname in collection_shortnames:
if shortname in granule_uri:
for feature in feature_types:
if feature in granule_uri:
try:
row = constants.TABLE_COLLECTION_INFO[
(constants.TABLE_COLLECTION_INFO['collection_name'] == shortname) &
(constants.TABLE_COLLECTION_INFO['feature_type'] == feature)]
table_name = row['table_name']
track_table = row['track_table']
except MissingTable as exc:
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from exc

return table_name, track_table


def find_new_granules(collection_shortname, start_date, end_date):
"""
Find granules to ingest
Expand Down Expand Up @@ -369,21 +300,17 @@ def load_data(dynamo_resource, table_name, items):
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from err
raise err

match hydrocron_table.table_name:
case constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest reaches'
feature_id = 'granuleUR'
case constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest nodes'
for table_info in constants.TABLE_COLLECTION_INFO:
if hydrocron_table.table_name in table_info['track_table']:
feature_name = 'track ingest ' + str.lower(table_info['feature_type'])
feature_id = 'granuleUR'
case constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest prior lakes'
feature_id = 'granuleUR'
case _:
try:
_, _, feature_name, feature_id = get_collection_info_by_table(hydrocron_table.table_name)
except MissingTable:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)
break
elif hydrocron_table.table_name in table_info['table_name']:
feature_name = table_info['feature_type']
feature_id = table_info['feature_id']
break
else:
raise MissingTable(f'Items cannot be parsed, file reader not implemented for table {hydrocron_table.table_name}')

if len(items) > 5:
logging.info("Batch adding %s %s items. First 5 feature ids in batch: ", len(items), feature_name)
Expand Down
18 changes: 7 additions & 11 deletions hydrocron/db/track_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,13 @@ def track_ingest_handler(event, context):
reprocessed_crid = event["reprocessed_crid"]
temporal = "temporal" in event.keys()

if ("reach" in collection_shortname) and ((hydrocron_table != constants.SWOT_REACH_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query reach data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if ("node" in collection_shortname) and ((hydrocron_table != constants.SWOT_NODE_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query node data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if ("prior" in collection_shortname) and ((hydrocron_table != constants.SWOT_PRIOR_LAKE_TABLE_NAME)
or (hydrocron_track_table != constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME)):
raise TableMisMatch(f"Error: Cannot query prior lake data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")
for table_info in constants.TABLE_COLLECTION_INFO:
if (table_info['collection_name'] in collection_shortname) & (str.lower(table_info['feature_type']) in collection_shortname):
hydrocron_table = table_info['table_name']
hydrocron_track_table = table_info['track_table']
break
else:
raise TableMisMatch(f"Error: Cannot query data for tables: '{hydrocron_table}' and '{hydrocron_track_table}'")

if temporal:
query_start = datetime.datetime.strptime(event["query_start"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
Expand Down
Loading

0 comments on commit d374573

Please sign in to comment.