diff --git a/raster_loader/cli/bigquery.py b/raster_loader/cli/bigquery.py index 31d8b15..089ce46 100644 --- a/raster_loader/cli/bigquery.py +++ b/raster_loader/cli/bigquery.py @@ -119,7 +119,9 @@ def upload( # create default table name if not provided if table is None: - table = get_default_table_name(file_path if is_local_file else urlparse(file_url).path, band) + table = get_default_table_name( + file_path if is_local_file else urlparse(file_url).path, band + ) credentials = None if token is not None: @@ -170,7 +172,12 @@ def upload( @click.option("--dataset", help="The name of the dataset.", required=True) @click.option("--table", help="The name of the table.", required=True) @click.option("--limit", help="Limit number of rows returned", default=10) -@click.option("--token", help="An access token to authenticate with.", required=False, default=None) +@click.option( + "--token", + help="An access token to authenticate with.", + required=False, + default=None, +) def describe(project, dataset, table, limit, token): credentials = None if token is not None: diff --git a/raster_loader/cli/snowflake.py b/raster_loader/cli/snowflake.py index 965e531..6a25b10 100644 --- a/raster_loader/cli/snowflake.py +++ b/raster_loader/cli/snowflake.py @@ -34,10 +34,19 @@ def snowflake(args=None): @click.option("--account", help="The Swnoflake account.", required=True) @click.option("--username", help="The username.", required=False, default=None) @click.option("--password", help="The password.", required=False, default=None) -@click.option("--token", help="An access token to authenticate with.", required=False, default=None) +@click.option( + "--token", + help="An access token to authenticate with.", + required=False, + default=None, +) @click.option("--role", help="The role to use for the file upload.", default=None) -@click.option("--file_path", help="The path to the raster file.", required=False, default=None) -@click.option("--file_url", help="The path to the raster file.", required=False, default=None) +@click.option( + "--file_path", help="The path to the raster file.", required=False, default=None +) +@click.option( + "--file_url", help="The path to the raster file.", required=False, default=None +) @click.option("--database", help="The name of the database.", required=True) @click.option("--schema", help="The name of the schema.", required=True) @click.option("--table", help="The name of the table.", default=None) @@ -77,6 +86,9 @@ def snowflake(args=None): default=False, is_flag=True, ) +@click.option( + "--timestamp", help="The timestamp value to attach as a column.", default=None +) @catch_exception() def upload( account, @@ -95,6 +107,7 @@ def upload( overwrite=False, append=False, cleanup_on_failure=False, + timestamp=None, ): from raster_loader.io.common import ( get_number_of_blocks, @@ -102,8 +115,12 @@ def upload( get_block_dims, ) - if (token is None and (username is None or password is None)) or all(v is not None for v in [token, username, password]): - raise ValueError("Either --token or --username and --password must be provided.") + if (token is None and (username is None or password is None)) or all( + v is not None for v in [token, username, password] + ): + raise ValueError( + "Either --token or --username and --password must be provided." + ) if file_path is None and file_url is None: raise ValueError("Either --file_path or --file_url must be provided.") @@ -126,7 +143,9 @@ def upload( # create default table name if not provided if table is None: - table = get_default_table_name(file_path if is_local_file else urlparse(file_url).path, band) + table = get_default_table_name( + file_path if is_local_file else urlparse(file_url).path, band + ) connector = SnowflakeConnection( username=username, @@ -158,6 +177,7 @@ def upload( click.echo("Schema: {}".format(schema)) click.echo("Table: {}".format(table)) click.echo("Number of Records Per Snowflake Append: {}".format(chunk_size)) + click.echo("Timestamp: {}".format(timestamp)) click.echo("Uploading Raster to Snowflake") @@ -170,6 +190,7 @@ def upload( overwrite=overwrite, append=append, cleanup_on_failure=cleanup_on_failure, + timestamp=timestamp, ) click.echo("Raster file uploaded to Snowflake") @@ -180,7 +201,12 @@ def upload( @click.option("--account", help="The Swnoflake account.", required=True) @click.option("--username", help="The username.", required=False, default=None) @click.option("--password", help="The password.", required=False, default=None) -@click.option("--token", help="An access token to authenticate with.", required=False, default=None) +@click.option( + "--token", + help="An access token to authenticate with.", + required=False, + default=None, +) @click.option("--role", help="The role to use for the file upload.", default=None) @click.option("--database", help="The name of the database.", required=True) @click.option("--schema", help="The name of the schema.", required=True) @@ -188,8 +214,12 @@ def upload( @click.option("--limit", help="Limit number of rows returned", default=10) def describe(account, username, password, token, role, database, schema, table, limit): - if (token is None and (username is None or password is None)) or all(v is not None for v in [token, username, password]): - raise ValueError("Either --token or --username and --password must be provided.") + if (token is None and (username is None or password is None)) or all( + v is not None for v in [token, username, password] + ): + raise ValueError( + "Either --token or --username and --password must be provided." + ) fqn = f"{database}.{schema}.{table}" connector = SnowflakeConnection( diff --git a/raster_loader/io/snowflake.py b/raster_loader/io/snowflake.py index 9ef5876..576bec1 100644 --- a/raster_loader/io/snowflake.py +++ b/raster_loader/io/snowflake.py @@ -4,6 +4,8 @@ from typing import Iterable, List, Tuple +from datetime import datetime + from raster_loader.errors import ( IncompatibleRasterException, import_error_snowflake, @@ -28,6 +30,17 @@ else: _has_snowflake = True + +def parse_timestamp_to_int(timestamp_str): + # Parse the timestamp string into a datetime object + dt_obj = datetime.fromisoformat(timestamp_str) + + # Convert the datetime object to a Unix timestamp + unix_timestamp = int(dt_obj.timestamp()) + + return unix_timestamp + + class SnowflakeConnection(DataWarehouseConnection): def __init__(self, username, password, account, database, schema, token, role): if not _has_snowflake: @@ -112,10 +125,14 @@ def upload_records( records: Iterable, fqn: str, overwrite: bool, + timestamp: str = None, ): records_list = [] for record in records: del record["METADATA"] + if timestamp is not None: + # parse timestamp from date string to int + record["TIMESTAMP"] = parse_timestamp_to_int(timestamp) records_list.append(record) data_df = pd.DataFrame(records_list) @@ -174,6 +191,7 @@ def upload_raster( overwrite: bool = False, append: bool = False, cleanup_on_failure: bool = False, + timestamp: str = None, ) -> bool: print("Loading raster file to Snowflake...") @@ -208,7 +226,7 @@ def upload_raster( total_blocks = get_number_of_blocks(file_path) if chunk_size is None: - ret = self.upload_records(records_gen, fqn, overwrite) + ret = self.upload_records(records_gen, fqn, overwrite, timestamp) if not ret: raise IOError("Error uploading to Snowflake.") else: @@ -221,7 +239,7 @@ def upload_raster( isFirstBatch = True for records in batched(records_gen, chunk_size): ret = self.upload_records( - records, fqn, overwrite and isFirstBatch + records, fqn, overwrite and isFirstBatch, timestamp ) pbar.update(chunk_size) if not ret: diff --git a/raster_loader/utils.py b/raster_loader/utils.py index e3adf85..2a60140 100644 --- a/raster_loader/utils.py +++ b/raster_loader/utils.py @@ -29,6 +29,7 @@ def batched(iterable, n): while batch := tuple(islice(it, n)): # noqa yield batch + def get_default_table_name(base_path: str, band): table = os.path.basename(base_path).split(".")[0] table = "_".join([table, "band", str(band), str(uuid.uuid4())]) diff --git a/temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh b/temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh new file mode 100644 index 0000000..5a5140c --- /dev/null +++ b/temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh @@ -0,0 +1,80 @@ + # INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201401010000_201501010000.nc' + INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201501010000_201601010000.nc' + BQ_UPLOAD=0 + SF_UPLOAD=1 + # cdo remapbil,global_0.25 $INPUT_PATH "${INPUT_PATH/.nc/}_regridded.nc" + # INPUT_PATH="${INPUT_PATH/.nc/}_regridded.nc" + + timestamp=1 + for idate in $(cdo showtimestamp $INPUT_PATH) + do + echo 'Processing idate: ' $idate + ymd="${idate:0:10}" # Extract year, month, and day + d="${idate:8:2}" # Extract day + hour="${idate:11:2}" # Extract hour + gdal_translate -ot Float64 NETCDF:$INPUT_PATH:precipitation_in -b $timestamp "${INPUT_PATH/.nc/}_${d}_${hour}.tif" + ((timestamp++)) + + TIF_PATH="${INPUT_PATH/.nc/}_${d}_${hour}.tif" + filename=$(basename "$TIF_PATH") + echo $filename + gdalwarp -s_srs EPSG:4326 -t_srs EPSG:3857 $TIF_PATH "${TIF_PATH/.tif/_webmercator.tif}" + rm $TIF_PATH + + WEBMERCATOR_PATH="${TIF_PATH/.tif/_webmercator.tif}" + OUTPUT_PATH="${WEBMERCATOR_PATH/_webmercator.tif/_quadbin.tif}" + gdalwarp "$WEBMERCATOR_PATH" \ + -of COG \ + -co TILING_SCHEME=GoogleMapsCompatible \ + -co COMPRESS=DEFLATE -co OVERVIEWS=NONE -co ADD_ALPHA=NO -co RESAMPLING=NEAREST "$OUTPUT_PATH" + rm $WEBMERCATOR_PATH + + # TABLE="${filename/.tif/_quadbin}" + + + # Get the number of bands in the GeoTIFF file + N_BANDS=$(gdalinfo "$OUTPUT_PATH" | grep "Band " | wc -l) + +if [ $BQ_UPLOAD -eq 1 ]; then + # GCP_PROJECT="cartodb-data-engineering-team" GCP_DATASET="vdelacruz_carto" GCP_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh + + COMMAND="echo \"yes\" | carto bigquery upload" + for ((band=1; band<=$N_BANDS; band++)); do + COMMAND+=" --band $band" + done + COMMAND+=" --file_path \"$OUTPUT_PATH\"" + COMMAND+=" --project \"$GCP_PROJECT\"" + COMMAND+=" --dataset \"$GCP_DATASET\"" + COMMAND+=" --table \"$GCP_TABLE\"" + COMMAND+=" --append" + + eval "$COMMAND" +fi + +if [ $SF_UPLOAD -eq 1 ]; then + + # SF_DATABASE="CARTO_DATA_ENGINEERING_TEAM" SF_SCHEMA="vdelacruz_carto" SF_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000" SF_ACCOUNT="sxa81489.us-east-1" SF_USERNAME="SUPERUSER_DATA_ENG_TEAM" SF_PASSWORD="XXXXX" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh + + COMMAND="echo \"yes\" | carto snowflake upload" + # for ((band=1; band<=$N_BANDS; band++)); do + # COMMAND+=" --band $band" + # done + COMMAND+=" --band 1 --band_name precipitation" + COMMAND+=" --file_path \"$OUTPUT_PATH\"" + COMMAND+=" --database \"$SF_DATABASE\"" + COMMAND+=" --schema \"$SF_SCHEMA\"" + COMMAND+=" --table \"$SF_TABLE\"" + COMMAND+=" --account \"$SF_ACCOUNT\"" + COMMAND+=" --username \"$SF_USERNAME\"" + COMMAND+=" --password \"$SF_PASSWORD\"" + COMMAND+=" --append" + COMMAND+=" --timestamp $idate" + + eval "$COMMAND" +fi + rm $OUTPUT_PATH + # if timestamp > 10 break +# if [ $timestamp -gt 0 ]; then +# break # Remove this line to process all the files +# fi +done diff --git a/temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh b/temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh new file mode 100644 index 0000000..3726c16 --- /dev/null +++ b/temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh @@ -0,0 +1,82 @@ + INPUT_PATH='./temporal_raster/data2/climate_data_Weather_source_era5_precip_201401010000_201501010000.nc' + # INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201501010000_201601010000.nc' + BQ_UPLOAD=0 + SF_UPLOAD=1 + # cdo remapbil,global_0.25 $INPUT_PATH "${INPUT_PATH/.nc/}_regridded.nc" + # INPUT_PATH="${INPUT_PATH/.nc/}_regridded.nc" + + timestamp=1 + for idate in $(cdo showtimestamp $INPUT_PATH) + do + echo 'Processing idate: ' $idate + year="${idate:0:4}" # Extract year + month="${idate:5:2}" # Extract month + d="${idate:8:2}" # Extract day + hour="${idate:11:2}" # Extract hour + + TIF_PATH="${INPUT_PATH/.nc/}_${year}_${month}_${d}_${hour}.tif" + WEBMERCATOR_PATH="${TIF_PATH/.tif/_webmercator.tif}" + OUTPUT_PATH="${WEBMERCATOR_PATH/_webmercator.tif/_quadbin.tif}" + + gdal_translate -ot Float64 NETCDF:$INPUT_PATH:precipitation_in -b $timestamp "${TIF_PATH}" + ((timestamp++)) + + filename=$(basename "$TIF_PATH") + echo $filename + gdalwarp -s_srs EPSG:4326 -t_srs EPSG:3857 $TIF_PATH "${WEBMERCATOR_PATH}" + rm $TIF_PATH + + gdalwarp "$WEBMERCATOR_PATH" \ + -of COG \ + -co TILING_SCHEME=GoogleMapsCompatible \ + -co COMPRESS=DEFLATE -co OVERVIEWS=NONE -co ADD_ALPHA=NO -co RESAMPLING=NEAREST "$OUTPUT_PATH" + rm $WEBMERCATOR_PATH + + # TABLE="${filename/.tif/_quadbin}" + + + # Get the number of bands in the GeoTIFF file + N_BANDS=$(gdalinfo "$OUTPUT_PATH" | grep "Band " | wc -l) + +if [ $BQ_UPLOAD -eq 1 ]; then + # GCP_PROJECT="cartodb-data-engineering-team" GCP_DATASET="vdelacruz_carto" GCP_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000_24" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh + + COMMAND="echo \"yes\" | carto bigquery upload" + for ((band=1; band<=$N_BANDS; band++)); do + COMMAND+=" --band $band" + done + COMMAND+=" --file_path \"$OUTPUT_PATH\"" + COMMAND+=" --project \"$GCP_PROJECT\"" + COMMAND+=" --dataset \"$GCP_DATASET\"" + COMMAND+=" --table \"$GCP_TABLE\"" + COMMAND+=" --append" + + eval "$COMMAND" +fi + +if [ $SF_UPLOAD -eq 1 ]; then + + # SF_DATABASE="CARTO_DATA_ENGINEERING_TEAM" SF_SCHEMA="vdelacruz_carto" SF_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000_24" SF_ACCOUNT="sxa81489.us-east-1" SF_USERNAME="SUPERUSER_DATA_ENG_TEAM" SF_PASSWORD="XXXX" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh + + COMMAND="echo \"yes\" | carto snowflake upload" + # for ((band=1; band<=$N_BANDS; band++)); do + # COMMAND+=" --band $band" + # done + COMMAND+=" --band 1 --band_name precipitation" + COMMAND+=" --file_path \"$OUTPUT_PATH\"" + COMMAND+=" --database \"$SF_DATABASE\"" + COMMAND+=" --schema \"$SF_SCHEMA\"" + COMMAND+=" --table \"$SF_TABLE\"" + COMMAND+=" --account \"$SF_ACCOUNT\"" + COMMAND+=" --username \"$SF_USERNAME\"" + COMMAND+=" --password \"$SF_PASSWORD\"" + COMMAND+=" --append" + COMMAND+=" --timestamp $idate" + + eval "$COMMAND" +fi + rm $OUTPUT_PATH +if [ $timestamp -gt 24 ]; then + break # Remove this line to process all the files +fi +done