Skip to content

Commit

Permalink
issue-193: Add Dynamo DB Table for SWOT Prior Lakes (#209)
Browse files Browse the repository at this point in the history
* add code to handle prior lakes shapefiles, add test prior lake data

* update terraform to add prior lake table

* fix tests, change to smaller test data file, changelog

* linting

* reconfigure main load_data method to make more readable and pass linting

* lint

* lint

* fix string casting to lower storage req & update test responses to handle different rounding pattern in coords

* update load benchmarking function for linting and add unit test

* try parent collection for lakes

* update version parsing for parent collection

* fix case error

* fix lake id reference

* add logging to troubleshoot too large features

* add item size logging and remove error raise for batch write

* clean up logging statements & move numeric_columns assignment

* update batch logging statement

* Rename constant

* Fix temp dir security risk https://rules.sonarsource.com/python/RSPEC-5443/

* Fix temp dir security risk https://rules.sonarsource.com/python/RSPEC-5443/

* fix code coverage calculation

---------

Co-authored-by: Frank Greguska <[email protected]>
  • Loading branch information
torimcd and frankinspace authored Aug 1, 2024
1 parent 4661966 commit 07ead89
Show file tree
Hide file tree
Showing 23 changed files with 1,975 additions and 1,253 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Issue 198 - Implement track ingest lambda function CMR and Hydrocron queries
- Issue 193 - Add new Dynamo table for prior lake data
### Changed
### Deprecated
### Removed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""
Unpacks SWOT Reach & Node Shapefiles
Unpacks SWOT Shapefiles
"""
import os.path
import json
import tempfile
from datetime import datetime, timezone
from importlib import resources
import xml.etree.ElementTree as ET
Expand All @@ -20,7 +21,7 @@

def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
"""
Reads a SWOT River Reach shapefile packaged as a zip
Reads a SWOT shapefile packaged as a zip
Parameters
----------
Expand All @@ -41,58 +42,56 @@ def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
to the database table
"""
filename = os.path.basename(filepath)
lambda_temp_file = '/tmp/' + filename

if filepath.startswith('s3'):
bucket_name, key = filepath.replace("s3://", "").split("/", 1)
logging.info("Opening granule %s from bucket %s", key, bucket_name)
with tempfile.TemporaryDirectory() as lambda_temp_dir_name:
lambda_temp_file = os.path.join(lambda_temp_dir_name, filename)
if filepath.startswith('s3'):
bucket_name, key = filepath.replace("s3://", "").split("/", 1)
logging.info("Opening granule %s from bucket %s", key, bucket_name)

s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file)
s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file)

shp_file = gpd.read_file('zip://' + lambda_temp_file)
with zipfile.ZipFile(lambda_temp_file) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))
shp_file = gpd.read_file('zip://' + lambda_temp_file)
with zipfile.ZipFile(lambda_temp_file) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))

elif filepath.startswith('https'):
_, bucket_name, key = filepath.replace("https://", "").split("/", 2)
logging.info("Opening granule %s from bucket %s", key, bucket_name)
elif filepath.startswith('https'):
_, bucket_name, key = filepath.replace("https://", "").split("/", 2)
logging.info("Opening granule %s from bucket %s", key, bucket_name)

s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file)
s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file)

shp_file = gpd.read_file('zip://' + lambda_temp_file)
with zipfile.ZipFile(lambda_temp_file) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))
else:
shp_file = gpd.read_file('zip://' + filepath)
with zipfile.ZipFile(filepath) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))
shp_file = gpd.read_file('zip://' + lambda_temp_file)
with zipfile.ZipFile(lambda_temp_file) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))
else:
shp_file = gpd.read_file('zip://' + filepath)
with zipfile.ZipFile(filepath) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))

numeric_columns = shp_file[columns].select_dtypes(include=[np.number]).columns
if obscure_data:
numeric_columns = shp_file[columns].select_dtypes(include=[np.number]).columns

shp_file[numeric_columns] = np.where(
(np.rint(shp_file[numeric_columns]) != -999) &
(np.rint(shp_file[numeric_columns]) != -99999999) &
(np.rint(shp_file[numeric_columns]) != -999999999999),
np.random.default_rng().integers(low=2, high=10)*shp_file[numeric_columns],
shp_file[numeric_columns])

shp_file = shp_file.astype(str)
filename_attrs = parse_from_filename(filename)

xml_attrs = parse_metadata_from_shpxml(shp_xml_tree)

attributes = filename_attrs | xml_attrs
items = assemble_attributes(shp_file, attributes)

if os.path.exists(lambda_temp_file):
os.remove(lambda_temp_file)

return items


def parse_metadata_from_shpxml(xml_elem):
"""
Read the SWORD version number from the shp.xml file
Read the prior database (SWORD or PLD) version number from the shp.xml file
and add to the database fields
Parameters
Expand All @@ -108,14 +107,16 @@ def parse_metadata_from_shpxml(xml_elem):
# get SWORD version
for globs in xml_elem.findall('global_attributes'):
prior_db_files = globs.find('xref_prior_river_db_files').text
metadata_attrs = {'sword_version': prior_db_files[-5:-3]}

metadata_attrs = {
'sword_version': prior_db_files[-5:-3]
}
# get PLD version
for globs in xml_elem.findall('global_metadata'):
prior_db_files = globs.find('xref_prior_lake_db_file').text
metadata_attrs = {'PLD_version': prior_db_files[-10:-7]}

# get units on fields that have them
for child in xml_elem:
if child.tag == 'attributes':
if child.tag in ('attributes', 'attribute_metadata'):
for field in child:
try:
units = field.find('units').text
Expand All @@ -130,27 +131,29 @@ def parse_metadata_from_shpxml(xml_elem):
return metadata_attrs


def assemble_attributes(file_as_str, attributes):
def assemble_attributes(geodf, attributes):
"""
Helper function to concat file attributes to records
Parameters
----------
file_as_str : string
The file records as a string
geodf : geodataframe
The file records as a geodataframe
attributes : dict
A dictionary of attributes to concatenate
"""

items = []

for _index, row in file_as_str.iterrows():
# rework to use dataframe instead of file as string
for _index, row in geodf.iterrows():

shp_attrs = json.loads(
row.to_json(default_handler=str))

item_attrs = shp_attrs | attributes

item_attrs = {key: str(item_attrs[key]) for key in item_attrs.keys()}
items.append(item_attrs)

return items
Expand All @@ -177,14 +180,18 @@ def parse_from_filename(filename):
collection = ""
collection_version = ""

if 'Reach' in filename:
if 'RiverSP_Reach' in filename:
collection = constants.SWOT_REACH_COLLECTION_NAME
collection_version = constants.SWOT_REACH_COLLECTION_VERSION

if 'Node' in filename:
if 'RiverSP_Node' in filename:
collection = constants.SWOT_NODE_COLLECTION_NAME
collection_version = constants.SWOT_NODE_COLLECTION_VERSION

if 'LakeSP_Prior' in filename:
collection = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
collection_version = constants.SWOT_PRIOR_LAKE_COLLECTION_VERSION

filename_attrs = {
'cycle_id': filename_components[5],
'pass_id': filename_components[6],
Expand Down
82 changes: 51 additions & 31 deletions hydrocron/db/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from botocore.exceptions import ClientError

from hydrocron.db import HydrocronTable
from hydrocron.db.io import swot_reach_node_shp
from hydrocron.db.io import swot_shp
from hydrocron.utils import connection
from hydrocron.utils import constants

Expand Down Expand Up @@ -49,6 +49,9 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613
case constants.SWOT_NODE_TABLE_NAME:
collection_shortname = constants.SWOT_NODE_COLLECTION_NAME
feature_type = 'Node'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
collection_shortname = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
feature_type = 'LakeSP_prior'
case constants.DB_TEST_TABLE_NAME:
collection_shortname = constants.SWOT_REACH_COLLECTION_NAME
feature_type = 'Reach'
Expand Down Expand Up @@ -95,14 +98,17 @@ def granule_handler(event, _):
if ("Node" in granule_path) & (table_name != constants.SWOT_NODE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Node data into table: '{table_name}'")

if ("LakeSP_Prior" in granule_path) & (table_name != constants.SWOT_PRIOR_LAKE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Prior Lake data into table: '{table_name}'")

logging.info("Value of load_benchmarking_data is: %s", load_benchmarking_data)

obscure_data = "true" in os.getenv("OBSCURE_DATA").lower()
logging.info("Value of obscure_data is: %s", obscure_data)

if load_benchmarking_data == "True":
logging.info("Loading benchmarking data")
items = swot_reach_node_shp.load_benchmarking_data()
items = swot_shp.load_benchmarking_data()
else:
logging.info("Setting up S3 connection")
s3_resource = connection.s3_resource
Expand Down Expand Up @@ -158,6 +164,18 @@ def cnm_handler(event, _):
InvocationType='Event',
Payload=event2)

if 'LakeSP_Prior' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))

lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)


def find_new_granules(collection_shortname, start_date, end_date):
"""
Expand Down Expand Up @@ -208,20 +226,28 @@ def read_data(granule_path, obscure_data, s3_resource=None):

if 'Reach' in granule_path:
logging.info("Start reading reach shapefile")
items = swot_reach_node_shp.read_shapefile(
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.REACH_DATA_COLUMNS,
s3_resource=s3_resource)

if 'Node' in granule_path:
logging.info("Start reading node shapefile")
items = swot_reach_node_shp.read_shapefile(
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.NODE_DATA_COLUMNS,
s3_resource=s3_resource)

if 'LakeSP_Prior' in granule_path:
logging.info("Start reading prior lake shapefile")
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.PRIOR_LAKE_DATA_COLUMNS,
s3_resource=s3_resource)

return items


Expand All @@ -247,33 +273,27 @@ def load_data(dynamo_resource, table_name, items):
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from err
raise err

if hydrocron_table.table_name == constants.SWOT_REACH_TABLE_NAME:

if len(items) > 5:
logging.info("Batch adding %s reach items", len(items))
for i in range(5):
logging.info("Item reach_id: %s", items[i]['reach_id'])
hydrocron_table.batch_fill_table(items)

else:
logging.info("Adding reach items to table individually")
for item_attrs in items:
logging.info("Item reach_id: %s", item_attrs['reach_id'])
hydrocron_table.add_data(**item_attrs)

elif hydrocron_table.table_name == constants.SWOT_NODE_TABLE_NAME:

if len(items) > 5:
logging.info("Batch adding %s node items", len(items))
for i in range(5):
logging.info("Item node_id: %s", items[i]['node_id'])
hydrocron_table.batch_fill_table(items)
match hydrocron_table.table_name:
case constants.SWOT_REACH_TABLE_NAME:
feature_name = 'reach'
feature_id = feature_name + '_id'
case constants.SWOT_NODE_TABLE_NAME:
feature_name = 'node'
feature_id = feature_name + '_id'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
feature_name = 'prior_lake'
feature_id = 'lake_id'
case _:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)

else:
logging.info("Adding node items to table individually")
for item_attrs in items:
logging.info("Item node_id: %s", item_attrs['node_id'])
hydrocron_table.add_data(**item_attrs)
if len(items) > 5:
logging.info("Batch adding %s %s items. First 5 feature ids in batch: ", len(items), feature_name)
for i in range(5):
logging.info("Item %s: %s", feature_id, items[i][feature_id])
hydrocron_table.batch_fill_table(items)

else:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)
logging.info("Adding %s items to table individually", feature_name)
for item_attrs in items:
logging.info("Item %s: %s", feature_id, item_attrs[feature_id])
hydrocron_table.add_data(**item_attrs)
17 changes: 15 additions & 2 deletions hydrocron/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Hydrocron Table module
"""
import logging
import sys
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

Expand Down Expand Up @@ -114,12 +115,24 @@ def batch_fill_table(self, items):
try:
with table.batch_writer() as writer:
for item in items:
writer.put_item(Item=item)
logger.info(
"Item %s size: %s",
item[self.partition_key_name],
str(sys.getsizeof(item))
)
if sys.getsizeof(item) < 300000:
writer.put_item(Item=item)
else:
logger.Warning(
"Item too large, could not load: %s %s",
self.partition_key_name,
item[self.partition_key_name]
)
continue
logger.info("Loaded data into table %s.", table.name)

except ClientError:
logger.exception("Couldn't load data into table %s.", table.name)
raise

def run_query(self, partition_key, sort_key=None):
"""
Expand Down
Loading

0 comments on commit 07ead89

Please sign in to comment.