Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modularization and further Code Cleanup #36

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions tools/code/Floods_Fathom3/common.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
# Required libraries
import tempfile, os, gc
import warnings
from collections import OrderedDict

import numpy as np
import pandas as pd
import geopandas as gpd
from tqdm import tqdm

import rasterio
import xarray as xr
import rioxarray as rxr
from rasterstats import gen_zonal_stats, zonal_stats

import requests
from shapely.geometry import shape
import os

# Load env vars from dotenv file
from dotenv import dotenv_values
Expand All @@ -30,6 +14,7 @@

# Define the REST API URL
rest_api_url = "https://services.arcgis.com/iQ1dY19aHwbSDYIF/ArcGIS/rest/services/World_Bank_Global_Administrative_Divisions_VIEW/FeatureServer"
worldpop_url = "https://data.worldpop.org/GIS/Population/"

# Mapping of administrative levels to field names for WB GAD dataset
adm_field_mapping = {
Expand Down
123 changes: 72 additions & 51 deletions tools/code/Floods_Fathom3/runAnalysis.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
# Required libraries
import os, gc
import numpy as np
import pandas as pd
import geopandas as gpd

import rasterio
import rioxarray as rxr
from rasterstats import gen_zonal_stats, zonal_stats

import requests
from shapely.geometry import shape

# Importing the required packages
from common import *
import common
from damageFunctions import mortality_factor, damage_factor_builtup, damage_factor_agri
from tqdm import tqdm

# Importing the libraries for parallel processing
import itertools as it
from functools import partial
import multiprocess as mp


DATA_DIR = common.DATA_DIR
OUTPUT_DIR = common.OUTPUT_DIR





# Defining functions for parallel processing of zonal_stats
def chunks(data, n):
# Yield successive n-sized chunks from a slice-able iterable.
for i in range(0, len(data), n):
yield data[i:i+n]
def chunks(iterable_data, n):
it_data = it.iter(iterable_data)
for chunk in it.iter(lambda: list(it.islice(it_data, n)), []):
yield chunk



def zonal_stats_partial(feats, raster, stats="*", affine=None, nodata=None, all_touched=True):
# Partial zonal stats for a parallel processing a list of features
Expand All @@ -24,23 +46,24 @@ def zonal_stats_parallel(args):

# Function to get the correct layer ID based on administrative level
def get_layer_id_for_adm(adm_level):
layers_url = f"{rest_api_url}/layers"
response = requests.get(layers_url, params={'f': 'json'})
layers_url = f"{common.rest_api_url}/layers"
target_layer_name = f"WB_GAD_ADM{adm_level}"

if response.status_code == 200:
layers_info = response.json().get('layers', [])
target_layer_name = f"WB_GAD_ADM{adm_level}"

for layer in layers_info:
if layer['name'] == target_layer_name:
return layer['id']

print(f"Layer matching {target_layer_name} not found.")
return None
else:
response = requests.get(layers_url, params={'f': 'json'})

if response.status_code != 200:
print(f"Failed to fetch layers. Status code: {response.status_code}")
return None

layers_info = response.json().get('layers', [])

for layer in layers_info:
if layer['name'] == target_layer_name:
return layer['id']

print(f"Layer matching {target_layer_name} not found.")
return None

# Function to fetch the ADM data using the correct layer ID
def get_adm_data(country, adm_level):
layer_id = get_layer_id_for_adm(adm_level)
Expand All @@ -49,7 +72,7 @@ def get_adm_data(country, adm_level):
print("Invalid administrative level or layer mapping not found.")
return None

query_url = f"{rest_api_url}/{layer_id}/query"
query_url = f"{common.rest_api_url}/{layer_id}/query"
params = {
'where': f"ISO_A3 = '{country}'",
'outFields': '*',
Expand All @@ -76,22 +99,21 @@ def get_adm_data(country, adm_level):

# Defining the function to download WorldPop data
def fetch_population_data(country: str, exp_year: str):
base_url = "https://data.worldpop.org/GIS/Population/"
dataset_path = f"Global_2000_2020_Constrained/{exp_year}/BSGM/{country}/{country.lower()}_ppp_{exp_year}_UNadj_constrained.tif"
download_url = f"{base_url}{dataset_path}"
download_url = f"{common.worldpop_url}{dataset_path}"

try:
response = requests.get(download_url)

if response.status_code == 200:
file_name = f"{DATA_DIR}/EXP/{country}_POP{exp_year}.tif"
with open(file_name, 'wb') as file:
file.write(response.content)
print(f"Data downloaded successfully and saved as {file_name}")
else:

if response.status_code != 200:
print(f"Failed to fetch data. Status code: {response.status_code}")
print(f"Response text: {response.text}")

file_name = f"{DATA_DIR}/EXP/{country}_POP{exp_year}.tif"
with open(file_name, 'wb') as file:
file.write(response.content)
print(f"Data downloaded successfully and saved as {file_name}")

except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")

Expand Down Expand Up @@ -145,7 +167,7 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R

if adm_data is not None:
# Get the correct field names based on the administrative level
field_names = adm_field_mapping.get(adm_level, {})
field_names = common.adm_field_mapping.get(adm_level, {})
code_field = field_names.get('code')
name_field = field_names.get('name')
wb_region = adm_data['WB_REGION'].iloc[0]
Expand Down Expand Up @@ -225,7 +247,7 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R
'prob_RPs_LB':prob_RPs_LB,
'prob_RPs_UB':prob_RPs_UB,
'prob_RPs_Mean':prob_RPs_Mean})
prob_RPs_df.to_csv(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_prob_RPs.csv"), index=False)
prob_RPs_df.to_csv(os.path.join(common.OUTPUT_DIR, f"{country}_{haz_cat}_prob_RPs.csv"), index=False)

# Computing the results for each RP
n_valid_RPs_gt_1 = len(valid_RPs) > 1
Expand All @@ -236,6 +258,8 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R
"analysis_type": analysis_type,
"country": country,
"haz_cat": haz_cat,
"period": period,
"scenario": scenario,
"exp_cat": exp_cat,
"exp_data": exp_data,
"min_haz_threshold": min_haz_threshold,
Expand All @@ -257,17 +281,14 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R
result_df = result_df_reorder_columns(result_df, valid_RPs, analysis_type, exp_cat,
adm_level, all_adm_codes, all_adm_names)
result_df = calc_EAEI(result_df, valid_RPs, prob_RPs_df, 'LB',
analysis_type, country, haz_cat, exp_cat,
adm_level, all_adm_codes, all_adm_names,
save_check_raster, num_bins, n_valid_RPs_gt_1)
analysis_type, country, haz_cat, period, scenario, exp_cat,
adm_level, save_check_raster, num_bins, n_valid_RPs_gt_1)
result_df = calc_EAEI(result_df, valid_RPs, prob_RPs_df, 'UB',
analysis_type, country, haz_cat, exp_cat,
adm_level, all_adm_codes, all_adm_names,
save_check_raster, num_bins, n_valid_RPs_gt_1)
analysis_type, country, haz_cat, period, scenario, exp_cat,
adm_level, save_check_raster, num_bins, n_valid_RPs_gt_1)
result_df = calc_EAEI(result_df, valid_RPs, prob_RPs_df, 'Mean',
analysis_type, country, haz_cat, exp_cat,
adm_level, all_adm_codes, all_adm_names,
save_check_raster, num_bins, n_valid_RPs_gt_1)
analysis_type, country, haz_cat, period, scenario, exp_cat,
adm_level, save_check_raster, num_bins, n_valid_RPs_gt_1)
result_df = result_df.round(3) # Round to three decimal places to avoid giving the impression of high precision

# If method == 'Mean', then simplify it's name
Expand All @@ -282,12 +303,12 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R
file_prefix = f"{country}_ADM{adm_level}_{haz_cat}_{exp_cat}_{exp_year}"
if analysis_type == "Function":
EAI_string = "EAI_" if len(valid_RPs) > 1 else ""
no_geom.to_csv(os.path.join(OUTPUT_DIR, f"{file_prefix}_{EAI_string}function.csv"), index=False)
result_df.to_file(os.path.join(OUTPUT_DIR, f"{file_prefix}_{EAI_string}function.gpkg"))
no_geom.to_csv(os.path.join(common.OUTPUT_DIR, f"{file_prefix}_{EAI_string}function.csv"), index=False)
result_df.to_file(os.path.join(common.OUTPUT_DIR, f"{file_prefix}_{EAI_string}function.gpkg"))
elif analysis_type == "Classes":
EAE_string = "EAE_" if len(valid_RPs) > 1 else ""
no_geom.to_csv(os.path.join(OUTPUT_DIR, f"{file_prefix}_{EAE_string}class.csv"), index=False)
result_df.to_file(os.path.join(OUTPUT_DIR, f"{file_prefix}_{EAE_string}class.gpkg"))
no_geom.to_csv(os.path.join(common.OUTPUT_DIR, f"{file_prefix}_{EAE_string}class.csv"), index=False)
result_df.to_file(os.path.join(common.OUTPUT_DIR, f"{file_prefix}_{EAE_string}class.gpkg"))

# Cleaning-up memory
del (country, haz_cat, valid_RPs, min_haz_threshold, exp_cat, adm_level)
Expand All @@ -301,7 +322,7 @@ def run_analysis(country: str, haz_cat: str, period: str, scenario: str, valid_R
print("Finished analysis.")


def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_data, min_haz_threshold,
def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, period, scenario, exp_cat, exp_data, min_haz_threshold,
damage_factor, save_check_raster, bin_seq, num_bins, adm_data, wb_region):
"""
Apply calculates for each given return period.
Expand Down Expand Up @@ -337,7 +358,7 @@ def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_
# Assign impact factor (this is F_i in the equations)
haz_data = damage_factor(haz_data, wb_region)
if save_check_raster:
haz_data.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_haz_imp_factor.tif"))
haz_data.rio.to_raster(os.path.join(common.OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_haz_imp_factor.tif"))
elif analysis_type == "Classes":
# Assign bin values to raster data - Follows: x_{i-1} <= x_{i} < x_{i+1}
bin_idx = np.digitize(haz_data, bin_seq)
Expand All @@ -347,7 +368,7 @@ def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_
affected_exp = exp_data.where(haz_data.data > 0, np.nan)

if save_check_raster:
affected_exp.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_affected.tif"))
affected_exp.rio.to_raster(os.path.join(common.OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_affected.tif"))

# Conduct analyses for classes
if analysis_type == "Classes":
Expand All @@ -373,7 +394,7 @@ def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_
impact_exp = affected_exp.data * haz_data
# If save intermediate to disk is TRUE, then
if save_check_raster:
impact_exp.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{period}_{scenario}_{rp}_{exp_cat}_impact.tif"))
impact_exp.rio.to_raster(os.path.join(common.OUTPUT_DIR, f"{country}_{period}_{scenario}_{rp}_{exp_cat}_impact.tif"))
# Compute the impact per ADM level
impact_exp_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=impact_exp.data, stats=["sum"],
affine=impact_exp.rio.transform(), nodata=np.nan)
Expand All @@ -400,8 +421,8 @@ def result_df_reorder_columns(result_df, RPs, analysis_type, exp_cat, adm_level,

return result_df

def calc_EAEI(result_df, RPs, prob_RPs_df, method, analysis_type, country, haz_cat, exp_cat,
adm_level, all_adm_codes, all_adm_names, save_check_raster, num_bins, n_valid_RPs_gt_1):
def calc_EAEI(result_df, RPs, prob_RPs_df, method, analysis_type, country, haz_cat, period, scenario, exp_cat,
adm_level, save_check_raster, num_bins, n_valid_RPs_gt_1):
"""
Computes the EAE/EAI over each given return period.
"""
Expand All @@ -424,7 +445,7 @@ def calc_EAEI(result_df, RPs, prob_RPs_df, method, analysis_type, country, haz_c
result_df[f"RP{rp}_EAI_tmp"] = result_df[f"RP{rp}_{exp_cat}_imp"] * freq
if save_check_raster:
EAI_i = impact_exp * freq
EAI_i.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_EAI.tif"))
EAI_i.rio.to_raster(os.path.join(common.OUTPUT_DIR, f"{country}_{haz_cat}_{period}_{scenario}_{rp}_{exp_cat}_EAI.tif"))

# Computing the EAE or EAI, if probabilistic (len(valid_RPs)>1)
if n_valid_RPs_gt_1:
Expand Down
Loading