From b9038d7512edb054c986746f70547af90f583d31 Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Wed, 6 Nov 2024 12:37:15 +0000 Subject: [PATCH 01/10] process input chile data to CSV --- Chile/CHL_extract_microdata_excel_to_csv.py | 31 +++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 Chile/CHL_extract_microdata_excel_to_csv.py diff --git a/Chile/CHL_extract_microdata_excel_to_csv.py b/Chile/CHL_extract_microdata_excel_to_csv.py new file mode 100644 index 0000000..c6a6a15 --- /dev/null +++ b/Chile/CHL_extract_microdata_excel_to_csv.py @@ -0,0 +1,31 @@ +# Databricks notebook source +# MAGIC %run ../utils + +# COMMAND ---------- + +import pandas as pd +import numpy as np + +COUNTRY = 'Chile' + +microdata_csv_dir = prepare_microdata_csv_dir(COUNTRY) +filename = input_excel_filename(COUNTRY) +disaggregated_data_sheets = ['Cen', 'Mun', 'Mun2'] + +for sheet in tqdm(disaggregated_data_sheets): + csv_file_path = f'{microdata_csv_dir}/{sheet}.csv' + df = pd.read_excel(filename, sheet_name=sheet, header=0) + + # Handle unnamed or null named columns + header = [col_name for col_name in df.columns if is_named_column(col_name)] + df = df[header] + df.columns = [col.strip() for col in header] + + # Normalize cells + df = df.applymap(normalize_cell) + + # Remove rows where all values are null + df = df.dropna(how='all') + + # Write to CSV + df.to_csv(csv_file_path, index=False, encoding='utf-8') From ff691bd60ff6b36b1bb70eb2a8fb47d2517cefba Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Wed, 6 Nov 2024 12:39:00 +0000 Subject: [PATCH 02/10] Added econ, func, admin, categories along with merging the two municipal sheets Note: Intersections between categories still exists. --- Chile/CHL_transform_load_dlt.py | 187 ++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 Chile/CHL_transform_load_dlt.py diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py new file mode 100644 index 0000000..d8c16ee --- /dev/null +++ b/Chile/CHL_transform_load_dlt.py @@ -0,0 +1,187 @@ +# Databricks notebook source +import dlt +import unicodedata +from pyspark.sql.functions import substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower +from pyspark.sql.types import StringType + +# Note DLT requires the path to not start with /dbfs +TOP_DIR = "/mnt/DAP/data/BOOSTProcessed" +INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries" +WORKSPACE_DIR = f"{TOP_DIR}/Workspace" +COUNTRY = 'Chile' +COUNTRY_MICRODATA_DIR = f'{WORKSPACE_DIR}/microdata_csv/{COUNTRY}' + +CSV_READ_OPTIONS = { + "header": "true", + "multiline": "true", + "quote": '"', + "escape": '"', +} + +@dlt.expect_or_drop("year_not_null", "year IS NOT NULL") +@dlt.table(name=f'chl_boost_bronze_cen') +def boost_bronze_cen(): + # Load the data from CSV + return (spark.read + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Cen.csv') + .withColumn("econ1", col("econ1").cast("string")) + .withColumn("transfer", col("transfer").cast("string")) + .withColumn("econ2", col("econ2").cast("string")) + .filter( + ((col('year')<2017) & (lower(col('transfer'))=='excluye consolidables') & (~col('econ2').rlike('^(28|32|34)'))) | + ((col('year')>2016) & (~(col('econ1').isin('Financiamiento', 'Consolidable')))) + ) + ) + +@dlt.expect_or_drop("year_not_null", "year IS NOT NULL") +@dlt.table(name=f'chl_boost_bronze_municipal') +def boost_bronze_municipal(): + df1 = (spark.read + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Mun.csv') + .withColumn("econ0", col("econ0").cast("string")) + .withColumn("econ1", col("econ1").cast("string")) + .withColumn("econ2", col("econ2").cast("string")) + .withColumn("econ3", col("econ3").cast("string")) + .withColumn("econ4", col("econ4").cast("string")) + .filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion')) + .withColumnRenamed('accrued', 'executed') + ) + df2 = (spark.read + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Mun2.csv') + .withColumn("ECON0", col("ECON0").cast("string")) + .withColumn("ECON1", col("ECON1").cast("string")) + .withColumn("ECON2", col("ECON2").cast("string")) + .withColumn("ECON3", col("ECON3").cast("string")) + .withColumn("ECON4", col("ECON4").cast("string")) + .filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion')) + .withColumnRenamed('ACCRUED', 'executed') + ) + df2 = df2.toDF(*[col.lower() for col in df2.columns]) + df = df1.unionByName(df2, allowMissingColumns=True) + return df + +@dlt.table(name='chl_boost_silver') +def boost_silver(): + cen = dlt.read('chl_boost_bronze_cen' + ).withColumn('year', col('year').cast('long') + ).withColumn( + 'sheet', lit('Cen') + ).withColumn( + 'admin0', lit('Central') + ).withColumn( + 'admin1', lit('Central Scope') + ).withColumn('geo1', lit('') + ).withColumn('func_sub', + when(col('admin1').rlike('^(03|10)'), 'judiciary') + .when(col('admin1').startswith('05'), 'public safety') + # No breakdown of education spending into primary, secondary etc + # No breakdown of health expenditure into primary, secondary etc + # agriculture + .when((col('admin1').startswith('13') & (lower(col('admin2'))!='05 corporacion nacional forestal')), 'agriculture') + # railroads + .when((col('program1').rlike('^(190102|190103)') | (col('program1').isin("02 S.Y Adm.Gral.Transporte-Empresa Ferrocarriles Del Estado","03 TRANSANTIAGO"))), 'railroads') + # water transport + .when(col('program1') == '06 D.G.O.P.-Direccion De Obras Portuarias', 'water transport') + # air transport + .when(col('program1') == '07 D.G.O.P.-Direccion De Aeropuertos', 'air transport') + ).withColumn('func', + when(col('admin1').startswith('11'), 'Defence') + .when(col('func_sub').isin('judiciary', 'public safety'), "Public order and safety") + .when(( + col('admin1').rlike('^(07|17)') | + col('func_sub').isin('agriculture', 'railroads', 'water transport', 'air transport') | + (col('road_transport')=='road') + ), 'Economic affairs') + .when(( + col('admin2').rlike('^(1305|2501|2502|2503)') | + col('admin2').isin('05 Corporacion Nacional Forestal', + '03 Tercer Tribunal Ambiental', + '03 Superintendencia Del Medio Ambiente', + '02 Segundo Tribunal Ambiental', + '02 Servicio De Evaluacion Ambiental', + '01 Subsecretaria De Medio Ambiente', + '01 Primer Tribunal Ambiental') + ), 'Environmental protection') + .when(col('admin1').startswith('18'), 'Housing and community amenities') + .when(col('admin1').startswith('16'), 'Health') + .when(col('admin1').startswith('09'), 'Education') + # No recreation and culture spending information + # No infromation on social protection + .otherwise('General public services') + ).withColumn('econ_sub', + # social assistance + when(lower(col('econ3')) == '02 prestaciones de asistencia social', 'social assistance') + # pensions + .when(lower(col('econ3')) == '01 Prestaciones Previsionales', 'pensions') + ).withColumn('econ', + # wage bill + when(col('econ2').startswith('21'), 'Wage bill') + # capex + .when(col('econ1')=='Gasto De Capital', 'Capital expenditures') + # goods and services + .when(col('econ2').startswith('22'), 'Goods and services') + # subsidies + .when(col('subsidies')=='y', 'Subsidies') + # social benefits + .when(col('econ_sub').isin('social assistance', 'pensions'), 'Social benefits') + # Other grants and trasnfers + .when(lower(col('econ3'))=='03 a otras entidades publicas', 'Other grants and transfers') + # interest on debt + .when(lower(col('econ3')).isin("03 intereses deuda interna", + "04 intereses deuda externa", + "05 otros gastos financieros deuda interna", + "06 otros gastos financieros deuda externa"), 'Interest on debt') + .otherwise('Other expenses') + ) + + mun = dlt.read('chl_boost_bronze_municipal' + ).withColumn( + 'sheet', lit('Mun') + ).withColumn( + 'admin0', lit('Regional') + ).withColumn('geo1', lit('') + ).withColumn('func', + when(lower(col('service2'))=='area de salud', 'Health') + .when(trim(lower(col('service2')))=='area de educacion', 'Education') + ).withColumn('econ', + # wage bill + when(col('econ2').startswith('21'), 'Wage bill') + # capex + .when(lower(col('econ1'))=='gastos por actividades de inversion', 'Capital expenditures') + # goods and services + .when(col('econ2').startswith('22'), 'Goods and services') + # subsidies + .when(((col('econ2')=='24 transferencias corrientes') & (col('econ3') == '01 al sector privado')), 'Subsidies') + # interest on debt + .when(lower(col('econ3')).isin('03 intereses deuda interna', '05 otros gastos financieros deuda interna'), 'Interest on debt') + ) + return cen.unionByName(mun, allowMissingColumns=True) + +@dlt.table(name=f'chl_boost_gold') +def boost_gold(): + return (dlt.read(f'chl_boost_silver') + .withColumn('country_name', lit(COUNTRY)) + .select('country_name', + 'year', + 'approved', + col('modified').alias('revised'), + col('executed').cast('int'), + 'geo1', + 'admin0', + 'admin1', + 'admin2', + 'func_sub', + 'func', + 'econ_sub', + 'econ') + ) + From 49258a2f7355cfca5ce37f8e562fbce1f4f25dc0 Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Tue, 12 Nov 2024 21:44:35 +0000 Subject: [PATCH 03/10] Corrected nulls in econ columns. Added admin1_tmp and admin0_tmp to avoid clash with existing columns. Lower case to avoid missing variations of condition --- Chile/CHL_transform_load_dlt.py | 60 +++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index d8c16ee..2f2cca4 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -31,8 +31,8 @@ def boost_bronze_cen(): .withColumn("transfer", col("transfer").cast("string")) .withColumn("econ2", col("econ2").cast("string")) .filter( - ((col('year')<2017) & (lower(col('transfer'))=='excluye consolidables') & (~col('econ2').rlike('^(28|32|34)'))) | - ((col('year')>2016) & (~(col('econ1').isin('Financiamiento', 'Consolidable')))) + ((col('year')<2017) & (lower(col('transfer'))=='excluye consolidables') & (~col('econ2').rlike('^(28|32|34|30)'))) | + ((col('year')>2016) & (~(lower(col('econ1')).isin('financiamiento', 'consolidable')))) ) ) @@ -44,11 +44,11 @@ def boost_bronze_municipal(): .options(**CSV_READ_OPTIONS) .option("inferSchema", "true") .load(f'{COUNTRY_MICRODATA_DIR}/Mun.csv') - .withColumn("econ0", col("econ0").cast("string")) - .withColumn("econ1", col("econ1").cast("string")) - .withColumn("econ2", col("econ2").cast("string")) - .withColumn("econ3", col("econ3").cast("string")) - .withColumn("econ4", col("econ4").cast("string")) + .withColumn("econ0", when(col("econ0").isNull(), "").otherwise(col("econ0").cast("string"))) + .withColumn("econ1", when(col("econ1").isNull(), "").otherwise(col("econ1").cast("string"))) + .withColumn("econ2", when(col("econ2").isNull(), "").otherwise(col("econ2").cast("string"))) + .withColumn("econ3", when(col("econ3").isNull(), "").otherwise(col("econ3").cast("string"))) + .withColumn("econ4", when(col("econ4").isNull(), "").otherwise(col("econ4").cast("string"))) .filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion')) .withColumnRenamed('accrued', 'executed') ) @@ -57,11 +57,11 @@ def boost_bronze_municipal(): .options(**CSV_READ_OPTIONS) .option("inferSchema", "true") .load(f'{COUNTRY_MICRODATA_DIR}/Mun2.csv') - .withColumn("ECON0", col("ECON0").cast("string")) - .withColumn("ECON1", col("ECON1").cast("string")) - .withColumn("ECON2", col("ECON2").cast("string")) - .withColumn("ECON3", col("ECON3").cast("string")) - .withColumn("ECON4", col("ECON4").cast("string")) + .withColumn("ECON0", when(col("ECON0").isNull(), "").otherwise(col("ECON0").cast("string"))) + .withColumn("ECON1", when(col("ECON1").isNull(), "").otherwise(col("ECON1").cast("string"))) + .withColumn("ECON2", when(col("ECON2").isNull(), "").otherwise(col("ECON2").cast("string"))) + .withColumn("ECON3", when(col("ECON3").isNull(), "").otherwise(col("ECON3").cast("string"))) + .withColumn("ECON4", when(col("ECON4").isNull(), "").otherwise(col("ECON4").cast("string"))) .filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion')) .withColumnRenamed('ACCRUED', 'executed') ) @@ -76,9 +76,9 @@ def boost_silver(): ).withColumn( 'sheet', lit('Cen') ).withColumn( - 'admin0', lit('Central') + 'admin0_tmp', lit('Central') ).withColumn( - 'admin1', lit('Central Scope') + 'admin1_tmp', lit('Central Scope') ).withColumn('geo1', lit('') ).withColumn('func_sub', when(col('admin1').rlike('^(03|10)'), 'judiciary') @@ -86,20 +86,26 @@ def boost_silver(): # No breakdown of education spending into primary, secondary etc # No breakdown of health expenditure into primary, secondary etc # agriculture - .when((col('admin1').startswith('13') & (lower(col('admin2'))!='05 corporacion nacional forestal')), 'agriculture') + .when((col('year')<2020) & (col('admin1').startswith('13') & (lower(col('admin2'))!='05 corporacion nacional forestal')), 'agriculture') + .when((col('year')>2019) & (col('admin1').startswith('13') & (~col('admin2').startswith('1305'))), 'agriculture') + # roads + .when(col('road_transport')=='road', 'road') # railroads - .when((col('program1').rlike('^(190102|190103)') | (col('program1').isin("02 S.Y Adm.Gral.Transporte-Empresa Ferrocarriles Del Estado","03 TRANSANTIAGO"))), 'railroads') + .when((col('program1').rlike('^(190102|190103)') | (lower(col('program1')).isin("02 S.Y Adm.Gral.Transporte-Empresa Ferrocarriles Del Estado".lower(),"03 TRANSANTIAGO".lower()))), 'railroads') # water transport - .when(col('program1') == '06 D.G.O.P.-Direccion De Obras Portuarias', 'water transport') + .when(lower(col('program1')) == '06 D.G.O.P.-Direccion De Obras Portuarias'.lower(), 'water transport') # air transport - .when(col('program1') == '07 D.G.O.P.-Direccion De Aeropuertos', 'air transport') + .when(lower(col('program1')) == '07 D.G.O.P.-Direccion De Aeropuertos'.lower(), 'air transport') + # energy + .when((((col('admin1').startswith('17'))&(lower(col('admin2')).isin("04 comision chilena de energia nuclear","05 comision nacional de energia")))| + (col('admin1').startswith('24'))), 'energy') + ).withColumn('func', when(col('admin1').startswith('11'), 'Defence') .when(col('func_sub').isin('judiciary', 'public safety'), "Public order and safety") .when(( - col('admin1').rlike('^(07|17)') | - col('func_sub').isin('agriculture', 'railroads', 'water transport', 'air transport') | - (col('road_transport')=='road') + ((col('admin1').rlike('^(07|17)') & (~lower(col('admin2')).isin("04 comision chilena de energia nuclear", "05 comision nacional de energia")))) | + col('func_sub').isin('agriculture', 'road', 'railroads', 'water transport', 'air transport', 'energy') ), 'Economic affairs') .when(( col('admin2').rlike('^(1305|2501|2502|2503)') | @@ -121,7 +127,7 @@ def boost_silver(): # social assistance when(lower(col('econ3')) == '02 prestaciones de asistencia social', 'social assistance') # pensions - .when(lower(col('econ3')) == '01 Prestaciones Previsionales', 'pensions') + .when(lower(col('econ3')) == '01 prestaciones previsionales', 'pensions') ).withColumn('econ', # wage bill when(col('econ2').startswith('21'), 'Wage bill') @@ -152,6 +158,7 @@ def boost_silver(): ).withColumn('func', when(lower(col('service2'))=='area de salud', 'Health') .when(trim(lower(col('service2')))=='area de educacion', 'Education') + .otherwise('General public services') ).withColumn('econ', # wage bill when(col('econ2').startswith('21'), 'Wage bill') @@ -160,9 +167,10 @@ def boost_silver(): # goods and services .when(col('econ2').startswith('22'), 'Goods and services') # subsidies - .when(((col('econ2')=='24 transferencias corrientes') & (col('econ3') == '01 al sector privado')), 'Subsidies') + .when(((lower(col('econ2'))=='24 transferencias corrientes') & (lower(col('econ3')) == '01 al sector privado')), 'Subsidies') # interest on debt .when(lower(col('econ3')).isin('03 intereses deuda interna', '05 otros gastos financieros deuda interna'), 'Interest on debt') + .otherwise('Other expenses') ) return cen.unionByName(mun, allowMissingColumns=True) @@ -174,10 +182,10 @@ def boost_gold(): 'year', 'approved', col('modified').alias('revised'), - col('executed').cast('int'), + 'executed', 'geo1', - 'admin0', - 'admin1', + col('admin0_tmp').alias('admin0'), + col('admin1_tmp').alias('admin1'), 'admin2', 'func_sub', 'func', From dcace6553ec151c60c6a9dfa92e4e5864ccd301c Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Thu, 21 Nov 2024 19:00:49 +0000 Subject: [PATCH 04/10] modified admin1 to admin1_tmp to not overwrite the original admin1 column. Changed type of year --- Chile/CHL_transform_load_dlt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index 2f2cca4..be27ff3 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -153,7 +153,7 @@ def boost_silver(): ).withColumn( 'sheet', lit('Mun') ).withColumn( - 'admin0', lit('Regional') + 'admin0_tmp', lit('Regional') ).withColumn('geo1', lit('') ).withColumn('func', when(lower(col('service2'))=='area de salud', 'Health') @@ -179,7 +179,7 @@ def boost_gold(): return (dlt.read(f'chl_boost_silver') .withColumn('country_name', lit(COUNTRY)) .select('country_name', - 'year', + col('year').cast("integer"), 'approved', col('modified').alias('revised'), 'executed', From a3860cbe7d9c14c79f09e8c16d2fff9cd664f2cc Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Thu, 21 Nov 2024 19:01:21 +0000 Subject: [PATCH 05/10] Added chl to pipeline --- cross_country_aggregate_dlt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cross_country_aggregate_dlt.py b/cross_country_aggregate_dlt.py index 51022df..adf67a3 100644 --- a/cross_country_aggregate_dlt.py +++ b/cross_country_aggregate_dlt.py @@ -5,7 +5,7 @@ from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType # Adding a new country requires adding the country here -country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf"] +country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf", 'chl'] schema = StructType([ StructField("country_name", StringType(), True, {'comment': 'The name of the country for which the budget data is recorded (e.g., "Kenya", "Brazil").'}), From 2e3cfd1150705c813a6e206f2173a9c06e173684 Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Tue, 10 Dec 2024 17:45:35 +0000 Subject: [PATCH 06/10] Modifications to take into account eh discrepancies. Added name mapping to align with subnational population. --- Chile/CHL_transform_load_dlt.py | 97 +++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index be27ff3..86f8313 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -1,7 +1,7 @@ # Databricks notebook source import dlt import unicodedata -from pyspark.sql.functions import substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower +from pyspark.sql.functions import substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map from pyspark.sql.types import StringType # Note DLT requires the path to not start with /dbfs @@ -18,6 +18,26 @@ "escape": '"', } +region_mapping = { + "IX": "Araucanía", + "RM": "Región Metropolitana de Santiago", + "I": "Tarapacá", + "II": "Antofagasta", + "III": "Atacama", + "IV": "Coquimbo", + "V": "Valparaíso", + "VI": "Libertador General Bernardo O'Higgins", + "VII": "Maule", + "VIII": "Biobío", + "X": "Los Lagos", + "XI": "Aysén", + "XII": "Magallanes y la Antártica Chilena", + "XIV": "Los Ríos", + "XV": "Arica y Parinacota", + "XVI": "Ñuble", +} +region_mapping_expr = create_map([lit(key) for pair in region_mapping.items() for key in pair]) + @dlt.expect_or_drop("year_not_null", "year IS NOT NULL") @dlt.table(name=f'chl_boost_bronze_cen') def boost_bronze_cen(): @@ -27,12 +47,32 @@ def boost_bronze_cen(): .options(**CSV_READ_OPTIONS) .option("inferSchema", "true") .load(f'{COUNTRY_MICRODATA_DIR}/Cen.csv') + .withColumn("region", when(col("region").isNull(), "").otherwise(col("region").cast("string"))) .withColumn("econ1", col("econ1").cast("string")) .withColumn("transfer", col("transfer").cast("string")) .withColumn("econ2", col("econ2").cast("string")) + .withColumn("region", region_mapping_expr.getItem(col("region"))) .filter( - ((col('year')<2017) & (lower(col('transfer'))=='excluye consolidables') & (~col('econ2').rlike('^(28|32|34|30)'))) | - ((col('year')>2016) & (~(lower(col('econ1')).isin('financiamiento', 'consolidable')))) + ( + (col('year') < 2017) & + ((lower(col('transfer')) == 'excluye consolidables') & (~col('econ2').rlike('^(32|30)'))) & + ( + (~col('econ2').rlike('^(28|34)')) | + (lower(col('econ3')).isin("03 intereses deuda interna", + "04 intereses deuda externa", + "05 otros gastos financieros deuda interna", + "06 otros gastos financieros deuda externa")) + ) + ) | + ((col('year') > 2016) & + ( + (~(lower(col('econ1')).isin('financiamiento', 'consolidable'))) | + (lower(col('econ3')).isin('04 intereses deuda externa', + '06 otros gastos financieros deuda externa', + '03 intereses deuda interna', + '05 otros gastos financieros deuda interna')) + ) + ) ) ) @@ -49,8 +89,11 @@ def boost_bronze_municipal(): .withColumn("econ2", when(col("econ2").isNull(), "").otherwise(col("econ2").cast("string"))) .withColumn("econ3", when(col("econ3").isNull(), "").otherwise(col("econ3").cast("string"))) .withColumn("econ4", when(col("econ4").isNull(), "").otherwise(col("econ4").cast("string"))) + .withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string"))) .filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion')) .withColumnRenamed('accrued', 'executed') + .withColumnRenamed('Servicio', 'service') + .withColumnRenamed('Region', 'region') ) df2 = (spark.read .format("csv") @@ -62,11 +105,14 @@ def boost_bronze_municipal(): .withColumn("ECON2", when(col("ECON2").isNull(), "").otherwise(col("ECON2").cast("string"))) .withColumn("ECON3", when(col("ECON3").isNull(), "").otherwise(col("ECON3").cast("string"))) .withColumn("ECON4", when(col("ECON4").isNull(), "").otherwise(col("ECON4").cast("string"))) + .withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string"))) .filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion')) .withColumnRenamed('ACCRUED', 'executed') + .withColumnRenamed('Service2', 'service') + .withColumnRenamed('Region', 'region') ) df2 = df2.toDF(*[col.lower() for col in df2.columns]) - df = df1.unionByName(df2, allowMissingColumns=True) + df = df1.unionByName(df2, allowMissingColumns=True).withColumn("region", region_mapping_expr.getItem(col("region"))) return df @dlt.table(name='chl_boost_silver') @@ -78,11 +124,19 @@ def boost_silver(): ).withColumn( 'admin0_tmp', lit('Central') ).withColumn( - 'admin1_tmp', lit('Central Scope') - ).withColumn('geo1', lit('') + 'admin1_tmp', when((col('region').isNull()) | (col('region') == ''), lit('Central Scope')).otherwise(col('region')) + ).withColumn( + 'admin2_tmp', col('admin2') + ).withColumn('geo1', lit('Central Scope') + ).withColumn('interest_tmp', + when(lower(col('econ3')).isin('04 intereses deuda externa', + '06 otros gastos financieros deuda externa', + '03 intereses deuda interna', + '05 otros gastos financieros deuda interna'), 1) + .otherwise(0) ).withColumn('func_sub', - when(col('admin1').rlike('^(03|10)'), 'judiciary') - .when(col('admin1').startswith('05'), 'public safety') + when((col('admin1').rlike('^(03|10)')) & (col('interest_tmp')==0), 'judiciary') + .when((col('admin1').startswith('05')) & (col('interest_tmp')==0), 'public safety') # No breakdown of education spending into primary, secondary etc # No breakdown of health expenditure into primary, secondary etc # agriculture @@ -103,7 +157,7 @@ def boost_silver(): ).withColumn('func', when(col('admin1').startswith('11'), 'Defence') .when(col('func_sub').isin('judiciary', 'public safety'), "Public order and safety") - .when(( + .when((col('interest_tmp')==0) & ( ((col('admin1').rlike('^(07|17)') & (~lower(col('admin2')).isin("04 comision chilena de energia nuclear", "05 comision nacional de energia")))) | col('func_sub').isin('agriculture', 'road', 'railroads', 'water transport', 'air transport', 'energy') ), 'Economic affairs') @@ -118,8 +172,8 @@ def boost_silver(): '01 Primer Tribunal Ambiental') ), 'Environmental protection') .when(col('admin1').startswith('18'), 'Housing and community amenities') - .when(col('admin1').startswith('16'), 'Health') - .when(col('admin1').startswith('09'), 'Education') + .when((col('admin1').startswith('16')) & (col('interest_tmp')==0), 'Health') + .when(col('admin1').startswith('09') & (col('interest_tmp')==0), 'Education') # No recreation and culture spending information # No infromation on social protection .otherwise('General public services') @@ -132,15 +186,19 @@ def boost_silver(): # wage bill when(col('econ2').startswith('21'), 'Wage bill') # capex - .when(col('econ1')=='Gasto De Capital', 'Capital expenditures') + .when(lower(col('econ1'))=='gasto de capital', 'Capital expenditures') # goods and services .when(col('econ2').startswith('22'), 'Goods and services') # subsidies - .when(col('subsidies')=='y', 'Subsidies') + .when((col('subsidies')=='y') & + (lower(col('econ3')) !="02 prestaciones de asistencia social") & + (lower(col('econ3')) !="01 prestaciones previsionales") & + (lower(col('econ3')) != "03 a otras entidades publicas") & + (lower(col('econ1')) !="gasto de capital"), 'Subsidies') # social benefits .when(col('econ_sub').isin('social assistance', 'pensions'), 'Social benefits') # Other grants and trasnfers - .when(lower(col('econ3'))=='03 a otras entidades publicas', 'Other grants and transfers') + .when((lower(col('econ3'))=='03 a otras entidades publicas') & (lower(col('econ1'))!='gasto de capital'), 'Other grants and transfers') # interest on debt .when(lower(col('econ3')).isin("03 intereses deuda interna", "04 intereses deuda externa", @@ -154,10 +212,14 @@ def boost_silver(): 'sheet', lit('Mun') ).withColumn( 'admin0_tmp', lit('Regional') - ).withColumn('geo1', lit('') + ).withColumn( + 'admin1_tmp', col('Region') + ).withColumn( + 'admin2_tmp', col('Municipio') + ).withColumn('geo1', col('Region') ).withColumn('func', - when(lower(col('service2'))=='area de salud', 'Health') - .when(trim(lower(col('service2')))=='area de educacion', 'Education') + when(lower(col('service'))=='area de salud', 'Health') + .when(trim(lower(col('service')))=='area de educacion', 'Education') .otherwise('General public services') ).withColumn('econ', # wage bill @@ -178,6 +240,7 @@ def boost_silver(): def boost_gold(): return (dlt.read(f'chl_boost_silver') .withColumn('country_name', lit(COUNTRY)) + #.filter(col('year') > 2008) .select('country_name', col('year').cast("integer"), 'approved', From fdacf3b47c97661ec5cc28b6f0145a22cd8733bc Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Fri, 20 Dec 2024 13:21:11 +0000 Subject: [PATCH 07/10] Added a filter condition to extract years only after 2008 (due to some discrepancies for the years 2007 and 2008, we omit them)) --- Chile/CHL_transform_load_dlt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index 86f8313..83990f0 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -240,7 +240,7 @@ def boost_silver(): def boost_gold(): return (dlt.read(f'chl_boost_silver') .withColumn('country_name', lit(COUNTRY)) - #.filter(col('year') > 2008) + .filter(col('year') > 2008) .select('country_name', col('year').cast("integer"), 'approved', From 5babd77ac659ea9643ff902e2e74ff800b26df0c Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Thu, 16 Jan 2025 21:36:01 +0000 Subject: [PATCH 08/10] Removed unnecessary comments --- Chile/CHL_transform_load_dlt.py | 362 ++++++++++++++++++-------------- 1 file changed, 200 insertions(+), 162 deletions(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index 83990f0..dc01f1b 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -1,11 +1,14 @@ # Databricks notebook source import dlt import unicodedata -from pyspark.sql.functions import substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map +from pyspark.sql.functions import ( + substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map, coalesce +) from pyspark.sql.types import StringType -# Note DLT requires the path to not start with /dbfs -TOP_DIR = "/mnt/DAP/data/BOOSTProcessed" +# Note: DLT requires the path to not start with /dbfs +TOP_DIR = "/Volumes/prd_mega/sboost4/vboost4" +#TOP_DIR = "/mnt/DAP/data/BOOSTProcessed" INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries" WORKSPACE_DIR = f"{TOP_DIR}/Workspace" COUNTRY = 'Chile' @@ -36,153 +39,193 @@ "XV": "Arica y Parinacota", "XVI": "Ñuble", } -region_mapping_expr = create_map([lit(key) for pair in region_mapping.items() for key in pair]) +region_mapping_expr = create_map( + [item for key, val in region_mapping.items() for item in (lit(key), lit(val))] +) -@dlt.expect_or_drop("year_not_null", "year IS NOT NULL") -@dlt.table(name=f'chl_boost_bronze_cen') + +@dlt.table(name='chl_boost_bronze_cen') def boost_bronze_cen(): - # Load the data from CSV return (spark.read - .format("csv") - .options(**CSV_READ_OPTIONS) - .option("inferSchema", "true") - .load(f'{COUNTRY_MICRODATA_DIR}/Cen.csv') - .withColumn("region", when(col("region").isNull(), "").otherwise(col("region").cast("string"))) - .withColumn("econ1", col("econ1").cast("string")) - .withColumn("transfer", col("transfer").cast("string")) - .withColumn("econ2", col("econ2").cast("string")) - .withColumn("region", region_mapping_expr.getItem(col("region"))) - .filter( - ( - (col('year') < 2017) & - ((lower(col('transfer')) == 'excluye consolidables') & (~col('econ2').rlike('^(32|30)'))) & - ( - (~col('econ2').rlike('^(28|34)')) | - (lower(col('econ3')).isin("03 intereses deuda interna", - "04 intereses deuda externa", - "05 otros gastos financieros deuda interna", - "06 otros gastos financieros deuda externa")) - ) - ) | - ((col('year') > 2016) & - ( - (~(lower(col('econ1')).isin('financiamiento', 'consolidable'))) | - (lower(col('econ3')).isin('04 intereses deuda externa', - '06 otros gastos financieros deuda externa', - '03 intereses deuda interna', - '05 otros gastos financieros deuda interna')) - ) - ) - ) - ) + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Cen.csv')) -@dlt.expect_or_drop("year_not_null", "year IS NOT NULL") -@dlt.table(name=f'chl_boost_bronze_municipal') -def boost_bronze_municipal(): - df1 = (spark.read - .format("csv") - .options(**CSV_READ_OPTIONS) - .option("inferSchema", "true") - .load(f'{COUNTRY_MICRODATA_DIR}/Mun.csv') - .withColumn("econ0", when(col("econ0").isNull(), "").otherwise(col("econ0").cast("string"))) - .withColumn("econ1", when(col("econ1").isNull(), "").otherwise(col("econ1").cast("string"))) - .withColumn("econ2", when(col("econ2").isNull(), "").otherwise(col("econ2").cast("string"))) - .withColumn("econ3", when(col("econ3").isNull(), "").otherwise(col("econ3").cast("string"))) - .withColumn("econ4", when(col("econ4").isNull(), "").otherwise(col("econ4").cast("string"))) - .withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string"))) - .filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion')) - .withColumnRenamed('accrued', 'executed') - .withColumnRenamed('Servicio', 'service') - .withColumnRenamed('Region', 'region') - ) - df2 = (spark.read - .format("csv") - .options(**CSV_READ_OPTIONS) - .option("inferSchema", "true") - .load(f'{COUNTRY_MICRODATA_DIR}/Mun2.csv') - .withColumn("ECON0", when(col("ECON0").isNull(), "").otherwise(col("ECON0").cast("string"))) - .withColumn("ECON1", when(col("ECON1").isNull(), "").otherwise(col("ECON1").cast("string"))) - .withColumn("ECON2", when(col("ECON2").isNull(), "").otherwise(col("ECON2").cast("string"))) - .withColumn("ECON3", when(col("ECON3").isNull(), "").otherwise(col("ECON3").cast("string"))) - .withColumn("ECON4", when(col("ECON4").isNull(), "").otherwise(col("ECON4").cast("string"))) - .withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string"))) - .filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion')) - .withColumnRenamed('ACCRUED', 'executed') - .withColumnRenamed('Service2', 'service') - .withColumnRenamed('Region', 'region') - ) - df2 = df2.toDF(*[col.lower() for col in df2.columns]) - df = df1.unionByName(df2, allowMissingColumns=True).withColumn("region", region_mapping_expr.getItem(col("region"))) - return df -@dlt.table(name='chl_boost_silver') -def boost_silver(): - cen = dlt.read('chl_boost_bronze_cen' - ).withColumn('year', col('year').cast('long') +@dlt.table(name='chl_boost_bronze_mun_1') +def boost_bronze_mun_1(): + return (spark.read + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Mun.csv')) + + +@dlt.table(name='chl_boost_bronze_mun_2') +def boost_bronze_mun_2(): + return (spark.read + .format("csv") + .options(**CSV_READ_OPTIONS) + .option("inferSchema", "true") + .load(f'{COUNTRY_MICRODATA_DIR}/Mun2.csv')) + + +@dlt.table(name='chl_boost_silver_mun') +def boost_silver_mun(): + df1 = (dlt.read('chl_boost_bronze_mun_1') + .withColumn("econ0", coalesce(col("econ0").cast("string"), lit(""))) + .withColumn("econ1", coalesce(col("econ1").cast("string"), lit(""))) + .withColumn("econ2", coalesce(col("econ2").cast("string"), lit(""))) + .withColumn("econ3", coalesce(col("econ3").cast("string"), lit(""))) + .withColumn("econ4", coalesce(col("econ4").cast("string"), lit(""))) + .withColumn("Region", coalesce(col("Region").cast("string"), lit(""))) + .filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion')) + .withColumnRenamed('accrued', 'executed') + .withColumnRenamed('Servicio', 'service') + .withColumnRenamed('Region', 'region')) + + df2 = (dlt.read('chl_boost_bronze_mun_2') + .withColumn("ECON0", coalesce(col("ECON0").cast("string"), lit(""))) + .withColumn("ECON1", coalesce(col("ECON1").cast("string"), lit(""))) + .withColumn("ECON2", coalesce(col("ECON2").cast("string"), lit(""))) + .withColumn("ECON3", coalesce(col("ECON3").cast("string"), lit(""))) + .withColumn("ECON4", coalesce(col("ECON4").cast("string"), lit(""))) + .withColumn("Region", coalesce(col("Region").cast("string"), lit(""))) + .filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion')) + .withColumnRenamed('ACCRUED', 'executed') + .withColumnRenamed('Service2', 'service') + .withColumnRenamed('Region', 'region') + .toDF(*[col.lower() for col in dlt.read('chl_boost_bronze_mun_2').columns])) + + df = df1.unionByName(df2, allowMissingColumns=True).withColumn("region", region_mapping_expr.getItem(col("region")) ).withColumn( - 'sheet', lit('Cen') + 'sheet', lit('Mun') ).withColumn( - 'admin0_tmp', lit('Central') + 'admin0_tmp', lit('Regional') ).withColumn( - 'admin1_tmp', when((col('region').isNull()) | (col('region') == ''), lit('Central Scope')).otherwise(col('region')) + 'admin1_tmp', col('Region') ).withColumn( - 'admin2_tmp', col('admin2') - ).withColumn('geo1', lit('Central Scope') - ).withColumn('interest_tmp', - when(lower(col('econ3')).isin('04 intereses deuda externa', + 'admin2_tmp', col('Municipio') + ).withColumn('geo1', col('Region') + ).withColumn('func', + when(lower(col('service'))=='area de salud', 'Health') + .when(trim(lower(col('service')))=='area de educacion', 'Education') + .otherwise('General public services') + ).withColumn('econ', + # wage bill + when(col('econ2').startswith('21'), 'Wage bill') + # capex + .when(lower(col('econ1'))=='gastos por actividades de inversion', 'Capital expenditures') + # goods and services + .when(col('econ2').startswith('22'), 'Goods and services') + # subsidies + .when(((lower(col('econ2'))=='24 transferencias corrientes') & (lower(col('econ3')) == '01 al sector privado')), 'Subsidies') + # interest on debt + .when(lower(col('econ3')).isin('03 intereses deuda interna', '05 otros gastos financieros deuda interna'), 'Interest on debt') + .otherwise('Other expenses') + ) + return df + + +@dlt.table(name='chl_boost_silver_cen') +def boost_silver_cen(): + return (dlt.read('chl_boost_bronze_cen') + .withColumn("region", when(col("region").isNull(), "").otherwise(col("region").cast("string"))) + .withColumn("econ1", col("econ1").cast("string")) + .withColumn("econ2", col("econ2").cast("string")) + .withColumn("transfer", col("transfer").cast("string")) + .withColumn("region", region_mapping_expr.getItem(col("region"))) + .filter( + ( + (col('year') <= 2016) & + ((lower(col('transfer')) == 'excluye consolidables') & (~col('econ2').rlike('^(32|30)'))) & + ( + (~col('econ2').rlike('^(28|34)')) | + (lower(col('econ3')).isin("03 intereses deuda interna", + "04 intereses deuda externa", + "05 otros gastos financieros deuda interna", + "06 otros gastos financieros deuda externa")) + ) + ) | + ((col('year') > 2016) & + ( + (~(lower(col('econ1')).isin('financiamiento', 'consolidable'))) | + (lower(col('econ3')).isin('04 intereses deuda externa', '06 otros gastos financieros deuda externa', '03 intereses deuda interna', - '05 otros gastos financieros deuda interna'), 1) - .otherwise(0) - ).withColumn('func_sub', - when((col('admin1').rlike('^(03|10)')) & (col('interest_tmp')==0), 'judiciary') - .when((col('admin1').startswith('05')) & (col('interest_tmp')==0), 'public safety') - # No breakdown of education spending into primary, secondary etc - # No breakdown of health expenditure into primary, secondary etc - # agriculture - .when((col('year')<2020) & (col('admin1').startswith('13') & (lower(col('admin2'))!='05 corporacion nacional forestal')), 'agriculture') - .when((col('year')>2019) & (col('admin1').startswith('13') & (~col('admin2').startswith('1305'))), 'agriculture') - # roads - .when(col('road_transport')=='road', 'road') - # railroads - .when((col('program1').rlike('^(190102|190103)') | (lower(col('program1')).isin("02 S.Y Adm.Gral.Transporte-Empresa Ferrocarriles Del Estado".lower(),"03 TRANSANTIAGO".lower()))), 'railroads') - # water transport - .when(lower(col('program1')) == '06 D.G.O.P.-Direccion De Obras Portuarias'.lower(), 'water transport') - # air transport - .when(lower(col('program1')) == '07 D.G.O.P.-Direccion De Aeropuertos'.lower(), 'air transport') - # energy - .when((((col('admin1').startswith('17'))&(lower(col('admin2')).isin("04 comision chilena de energia nuclear","05 comision nacional de energia")))| - (col('admin1').startswith('24'))), 'energy') - - ).withColumn('func', + '05 otros gastos financieros deuda interna')) + ) + )) + .withColumn('year', col('year').cast('long')) + .withColumn('sheet', lit('Cen')) + .withColumn('admin0_tmp', lit('Central')) + .withColumn( + 'admin1_tmp', + when((col('region').isNull()) | (col('region') == ''), lit('Central Scope')).otherwise(col('region'))) + .withColumn('admin2_tmp', col('admin2')) + .withColumn('geo1', lit('Central Scope')) + .withColumn( + 'interest_tmp', + when( + lower(col('econ3')).isin( + '04 intereses deuda externa', + '06 otros gastos financieros deuda externa', + '03 intereses deuda interna', + '05 otros gastos financieros deuda interna' + ), + 1 + ).otherwise(0)) + .withColumn( + 'func_sub', + when((col('admin1').rlike('^(03|10)')) & (col('interest_tmp') == 0), 'judiciary') + .when((col('admin1').startswith('05')) & (col('interest_tmp') == 0), 'public safety') + .when((col('year') <= 2019) & (col('admin1').startswith('13')) & (lower(col('admin2')) != '05 corporacion nacional forestal'), 'agriculture') + .when((col('year') > 2019) & (col('admin1').startswith('13')) & (~col('admin2').startswith('1305')), 'agriculture') + .when(col('road_transport') == 'road', 'road') + .when( + (col('program1').rlike('^(190102|190103)')) | (lower(col('program1')).isin( + '02 s.y adm.gral.transporte-empresa ferrocarriles del estado', + '03 transantiago' + )), + 'railroads') + .when(lower(col('program1')) == '06 d.g.o.p.-direccion de obras portuarias', 'water transport') + .when(lower(col('program1')) == '07 d.g.o.p.-direccion de aeropuertos', 'air transport') + .when( + ((col('admin1').startswith('17') & (lower(col('admin2')).isin('04 comision chilena de energia nuclear', '05 comision nacional de energia'))) + | col('admin1').startswith('24')), + 'energy' + )) + .withColumn( + 'func', when(col('admin1').startswith('11'), 'Defence') - .when(col('func_sub').isin('judiciary', 'public safety'), "Public order and safety") - .when((col('interest_tmp')==0) & ( - ((col('admin1').rlike('^(07|17)') & (~lower(col('admin2')).isin("04 comision chilena de energia nuclear", "05 comision nacional de energia")))) | - col('func_sub').isin('agriculture', 'road', 'railroads', 'water transport', 'air transport', 'energy') - ), 'Economic affairs') - .when(( - col('admin2').rlike('^(1305|2501|2502|2503)') | - col('admin2').isin('05 Corporacion Nacional Forestal', - '03 Tercer Tribunal Ambiental', - '03 Superintendencia Del Medio Ambiente', - '02 Segundo Tribunal Ambiental', - '02 Servicio De Evaluacion Ambiental', - '01 Subsecretaria De Medio Ambiente', - '01 Primer Tribunal Ambiental') - ), 'Environmental protection') + .when(col('func_sub').isin('judiciary', 'public safety'), 'Public order and safety') + .when( + (col('interest_tmp') == 0) & + (((col('admin1').rlike('^(07|17)') & (~lower(col('admin2')).isin('04 comision chilena de energia nuclear', '05 comision nacional de energia')))) + | col('func_sub').isin('agriculture', 'road', 'railroads', 'water transport', 'air transport', 'energy')), + 'Economic affairs') + .when( + (col('admin2').rlike('^(1305|2501|2502|2503)') | + col('admin2').isin( + '05 Corporacion Nacional Forestal', + '03 Tercer Tribunal Ambiental', + '03 Superintendencia Del Medio Ambiente', + '02 Segundo Tribunal Ambiental', + '02 Servicio De Evaluacion Ambiental', + '01 Subsecretaria De Medio Ambiente', + '01 Primer Tribunal Ambiental' + )), + 'Environmental protection') .when(col('admin1').startswith('18'), 'Housing and community amenities') - .when((col('admin1').startswith('16')) & (col('interest_tmp')==0), 'Health') - .when(col('admin1').startswith('09') & (col('interest_tmp')==0), 'Education') - # No recreation and culture spending information - # No infromation on social protection - .otherwise('General public services') - ).withColumn('econ_sub', - # social assistance + .when((col('admin1').startswith('16')) & (col('interest_tmp') == 0), 'Health') + .when(col('admin1').startswith('09') & (col('interest_tmp') == 0), 'Education') + .otherwise('General public services')) + .withColumn( + 'econ_sub', when(lower(col('econ3')) == '02 prestaciones de asistencia social', 'social assistance') - # pensions - .when(lower(col('econ3')) == '01 prestaciones previsionales', 'pensions') - ).withColumn('econ', + .when(lower(col('econ3')) == '01 prestaciones previsionales', 'pensions')) + .withColumn('econ', # wage bill when(col('econ2').startswith('21'), 'Wage bill') # capex @@ -206,36 +249,31 @@ def boost_silver(): "06 otros gastos financieros deuda externa"), 'Interest on debt') .otherwise('Other expenses') ) + ) - mun = dlt.read('chl_boost_bronze_municipal' - ).withColumn( - 'sheet', lit('Mun') - ).withColumn( - 'admin0_tmp', lit('Regional') - ).withColumn( - 'admin1_tmp', col('Region') - ).withColumn( - 'admin2_tmp', col('Municipio') - ).withColumn('geo1', col('Region') - ).withColumn('func', - when(lower(col('service'))=='area de salud', 'Health') - .when(trim(lower(col('service')))=='area de educacion', 'Education') - .otherwise('General public services') - ).withColumn('econ', - # wage bill - when(col('econ2').startswith('21'), 'Wage bill') - # capex - .when(lower(col('econ1'))=='gastos por actividades de inversion', 'Capital expenditures') - # goods and services - .when(col('econ2').startswith('22'), 'Goods and services') - # subsidies - .when(((lower(col('econ2'))=='24 transferencias corrientes') & (lower(col('econ3')) == '01 al sector privado')), 'Subsidies') - # interest on debt - .when(lower(col('econ3')).isin('03 intereses deuda interna', '05 otros gastos financieros deuda interna'), 'Interest on debt') - .otherwise('Other expenses') + +@dlt.table(name='chl_boost_silver') +def boost_silver(): + cen = (dlt.read('chl_boost_silver_cen') + .withColumn('sheet', lit('Cen')) + .withColumn('admin0_tmp', lit('Central')) + .withColumn( + 'admin1_tmp', + when((col('region').isNull()) | (col('region') == ''), lit('Central Scope')).otherwise(col('region')) ) + .withColumn('admin2_tmp', col('admin2')) + .withColumn('geo1', lit('Central Scope'))) + + mun = (dlt.read('chl_boost_silver_mun') + .withColumn('sheet', lit('Mun')) + .withColumn('admin0_tmp', lit('Regional')) + .withColumn('admin1_tmp', col('region')) + .withColumn('admin2_tmp', col('municipio')) + .withColumn('geo1', col('region'))) + return cen.unionByName(mun, allowMissingColumns=True) + @dlt.table(name=f'chl_boost_gold') def boost_gold(): return (dlt.read(f'chl_boost_silver') From 5b7770e7b939dcce2f330ad53bb9df593fe9802f Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Thu, 16 Jan 2025 21:37:29 +0000 Subject: [PATCH 09/10] Updated bronze to reflect raw data. Modifications to processing of the Municipal bronze and silver stages --- Chile/CHL_extract_microdata_excel_to_csv.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Chile/CHL_extract_microdata_excel_to_csv.py b/Chile/CHL_extract_microdata_excel_to_csv.py index c6a6a15..cba6477 100644 --- a/Chile/CHL_extract_microdata_excel_to_csv.py +++ b/Chile/CHL_extract_microdata_excel_to_csv.py @@ -16,16 +16,10 @@ csv_file_path = f'{microdata_csv_dir}/{sheet}.csv' df = pd.read_excel(filename, sheet_name=sheet, header=0) - # Handle unnamed or null named columns header = [col_name for col_name in df.columns if is_named_column(col_name)] df = df[header] df.columns = [col.strip() for col in header] - # Normalize cells df = df.applymap(normalize_cell) - - # Remove rows where all values are null df = df.dropna(how='all') - - # Write to CSV df.to_csv(csv_file_path, index=False, encoding='utf-8') From f6dece29708db73f57673dd7026bae7ff4e1ee0c Mon Sep 17 00:00:00 2001 From: bhupatiraju Date: Fri, 24 Jan 2025 12:54:52 +0000 Subject: [PATCH 10/10] Modified region_mapping_expr definition from list comprehension to loop for readability --- Chile/CHL_transform_load_dlt.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Chile/CHL_transform_load_dlt.py b/Chile/CHL_transform_load_dlt.py index dc01f1b..b602730 100644 --- a/Chile/CHL_transform_load_dlt.py +++ b/Chile/CHL_transform_load_dlt.py @@ -4,11 +4,9 @@ from pyspark.sql.functions import ( substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map, coalesce ) -from pyspark.sql.types import StringType # Note: DLT requires the path to not start with /dbfs TOP_DIR = "/Volumes/prd_mega/sboost4/vboost4" -#TOP_DIR = "/mnt/DAP/data/BOOSTProcessed" INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries" WORKSPACE_DIR = f"{TOP_DIR}/Workspace" COUNTRY = 'Chile' @@ -39,9 +37,10 @@ "XV": "Arica y Parinacota", "XVI": "Ñuble", } -region_mapping_expr = create_map( - [item for key, val in region_mapping.items() for item in (lit(key), lit(val))] -) +region_mapping_list = [] +for key, val in region_mapping.items(): + region_mapping_list.extend([lit(key), lit(val)]) +region_mapping_expr = create_map(region_mapping_list) @dlt.table(name='chl_boost_bronze_cen')