Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chile #34

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
31 changes: 31 additions & 0 deletions Chile/CHL_extract_microdata_excel_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Databricks notebook source
# MAGIC %run ../utils

# COMMAND ----------

import pandas as pd
import numpy as np

COUNTRY = 'Chile'

microdata_csv_dir = prepare_microdata_csv_dir(COUNTRY)
filename = input_excel_filename(COUNTRY)
disaggregated_data_sheets = ['Cen', 'Mun', 'Mun2']

for sheet in tqdm(disaggregated_data_sheets):
csv_file_path = f'{microdata_csv_dir}/{sheet}.csv'
df = pd.read_excel(filename, sheet_name=sheet, header=0)

# Handle unnamed or null named columns
header = [col_name for col_name in df.columns if is_named_column(col_name)]
df = df[header]
df.columns = [col.strip() for col in header]

# Normalize cells
df = df.applymap(normalize_cell)
Copy link

@elysenko elysenko Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The applymap function is deprecated, use the map function instead

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.applymap.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are multiple places across the countries where this has been used so I'll clean this up along with those. For now, I am leaving it as is instead of looping over the columns and using map.


# Remove rows where all values are null
df = df.dropna(how='all')

# Write to CSV
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
df.to_csv(csv_file_path, index=False, encoding='utf-8')
258 changes: 258 additions & 0 deletions Chile/CHL_transform_load_dlt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# Databricks notebook source
import dlt
import unicodedata
from pyspark.sql.functions import substring, col, lit, when, udf, trim, regexp_replace, initcap, concat, lower, create_map
from pyspark.sql.types import StringType

# Note DLT requires the path to not start with /dbfs
TOP_DIR = "/mnt/DAP/data/BOOSTProcessed"
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
INPUT_DIR = f"{TOP_DIR}/Documents/input/Countries"
WORKSPACE_DIR = f"{TOP_DIR}/Workspace"
COUNTRY = 'Chile'
COUNTRY_MICRODATA_DIR = f'{WORKSPACE_DIR}/microdata_csv/{COUNTRY}'

CSV_READ_OPTIONS = {
"header": "true",
"multiline": "true",
"quote": '"',
"escape": '"',
}

region_mapping = {
"IX": "Araucanía",
"RM": "Región Metropolitana de Santiago",
"I": "Tarapacá",
"II": "Antofagasta",
"III": "Atacama",
"IV": "Coquimbo",
"V": "Valparaíso",
"VI": "Libertador General Bernardo O'Higgins",
"VII": "Maule",
"VIII": "Biobío",
"X": "Los Lagos",
"XI": "Aysén",
"XII": "Magallanes y la Antártica Chilena",
"XIV": "Los Ríos",
"XV": "Arica y Parinacota",
"XVI": "Ñuble",
}
region_mapping_expr = create_map([lit(key) for pair in region_mapping.items() for key in pair])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nested list comprehension isn't the most readable. Try something like

region_mapping_expr = create_map(
    [lit(key), lit(val) for key, val in region_mapping.items()]
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work since I need the alternating keys and values in a flattened list for create_map.
I modified it to be the following:

region_mapping_expr = create_map(
    [item for key, val in region_mapping.items() for item in (lit(key), lit(val))]
)

Does this read better?

Copy link
Contributor

@weilu weilu Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really – I don't understand what it is trying to do. I'm generally not a fan of nested list comprehensions. I'd rather it all expanded in a for loop. Up to you.


@dlt.expect_or_drop("year_not_null", "year IS NOT NULL")
@dlt.table(name=f'chl_boost_bronze_cen')
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
def boost_bronze_cen():
# Load the data from CSV
return (spark.read
.format("csv")
.options(**CSV_READ_OPTIONS)
.option("inferSchema", "true")
.load(f'{COUNTRY_MICRODATA_DIR}/Cen.csv')
.withColumn("region", when(col("region").isNull(), "").otherwise(col("region").cast("string")))
.withColumn("econ1", col("econ1").cast("string"))
.withColumn("transfer", col("transfer").cast("string"))
.withColumn("econ2", col("econ2").cast("string"))
.withColumn("region", region_mapping_expr.getItem(col("region")))
.filter(
(
(col('year') < 2017) &
((lower(col('transfer')) == 'excluye consolidables') & (~col('econ2').rlike('^(32|30)'))) &
(
(~col('econ2').rlike('^(28|34)')) |
(lower(col('econ3')).isin("03 intereses deuda interna",
"04 intereses deuda externa",
"05 otros gastos financieros deuda interna",
"06 otros gastos financieros deuda externa"))
)
) |
((col('year') > 2016) &
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
(
(~(lower(col('econ1')).isin('financiamiento', 'consolidable'))) |
(lower(col('econ3')).isin('04 intereses deuda externa',
'06 otros gastos financieros deuda externa',
'03 intereses deuda interna',
'05 otros gastos financieros deuda interna'))
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
)
)
)
)

@dlt.expect_or_drop("year_not_null", "year IS NOT NULL")
@dlt.table(name=f'chl_boost_bronze_municipal')
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
def boost_bronze_municipal():
df1 = (spark.read
.format("csv")
.options(**CSV_READ_OPTIONS)
.option("inferSchema", "true")
.load(f'{COUNTRY_MICRODATA_DIR}/Mun.csv')
.withColumn("econ0", when(col("econ0").isNull(), "").otherwise(col("econ0").cast("string")))
.withColumn("econ1", when(col("econ1").isNull(), "").otherwise(col("econ1").cast("string")))
.withColumn("econ2", when(col("econ2").isNull(), "").otherwise(col("econ2").cast("string")))
.withColumn("econ3", when(col("econ3").isNull(), "").otherwise(col("econ3").cast("string")))
.withColumn("econ4", when(col("econ4").isNull(), "").otherwise(col("econ4").cast("string")))
.withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string")))
bhupatiraju marked this conversation as resolved.
Show resolved Hide resolved
.filter((col('econ0') == '2 Gasto') & (lower(col('econ1')) != 'gastos por actividades de financiacion'))
.withColumnRenamed('accrued', 'executed')
.withColumnRenamed('Servicio', 'service')
.withColumnRenamed('Region', 'region')
)
df2 = (spark.read
.format("csv")
.options(**CSV_READ_OPTIONS)
.option("inferSchema", "true")
.load(f'{COUNTRY_MICRODATA_DIR}/Mun2.csv')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have two bronze_municipal tables for loading from each Mun sheet, and then either two corresponding silver tables, or a single one that reads from both bronze, process then combine them. I feel like that would be cleaner and preserve data at each stage as defined by the medallion architecture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code to read mun1 and mun2 into their own bronze tables (with no transformations there). Then they are merged into chl_boost_bronze_mun.

In a similar manner, cen is read into chl_boost_bronze_cen with no modifications. Then, transformations are done to produce chl_boost_silver_cen.

Finally, chl_boost_silver_cen and chl_boost_silver_mun are merged to produce chl_boost_silver

.withColumn("ECON0", when(col("ECON0").isNull(), "").otherwise(col("ECON0").cast("string")))
.withColumn("ECON1", when(col("ECON1").isNull(), "").otherwise(col("ECON1").cast("string")))
.withColumn("ECON2", when(col("ECON2").isNull(), "").otherwise(col("ECON2").cast("string")))
.withColumn("ECON3", when(col("ECON3").isNull(), "").otherwise(col("ECON3").cast("string")))
.withColumn("ECON4", when(col("ECON4").isNull(), "").otherwise(col("ECON4").cast("string")))
.withColumn("Region", when(col("Region").isNull(), "").otherwise(col("Region").cast("string")))
.filter((col('ECON0') == '2 Gasto') & (lower(col('ECON1')) != 'gastos por actividades de financiacion'))
.withColumnRenamed('ACCRUED', 'executed')
.withColumnRenamed('Service2', 'service')
.withColumnRenamed('Region', 'region')
)
df2 = df2.toDF(*[col.lower() for col in df2.columns])
df = df1.unionByName(df2, allowMissingColumns=True).withColumn("region", region_mapping_expr.getItem(col("region")))
return df

@dlt.table(name='chl_boost_silver')
def boost_silver():
cen = dlt.read('chl_boost_bronze_cen'
).withColumn('year', col('year').cast('long')
).withColumn(
'sheet', lit('Cen')
).withColumn(
'admin0_tmp', lit('Central')
).withColumn(
'admin1_tmp', when((col('region').isNull()) | (col('region') == ''), lit('Central Scope')).otherwise(col('region'))
).withColumn(
'admin2_tmp', col('admin2')
).withColumn('geo1', lit('Central Scope')
).withColumn('interest_tmp',
when(lower(col('econ3')).isin('04 intereses deuda externa',
'06 otros gastos financieros deuda externa',
'03 intereses deuda interna',
'05 otros gastos financieros deuda interna'), 1)
.otherwise(0)
).withColumn('func_sub',
when((col('admin1').rlike('^(03|10)')) & (col('interest_tmp')==0), 'judiciary')
.when((col('admin1').startswith('05')) & (col('interest_tmp')==0), 'public safety')
# No breakdown of education spending into primary, secondary etc
# No breakdown of health expenditure into primary, secondary etc
# agriculture
.when((col('year')<2020) & (col('admin1').startswith('13') & (lower(col('admin2'))!='05 corporacion nacional forestal')), 'agriculture')
.when((col('year')>2019) & (col('admin1').startswith('13') & (~col('admin2').startswith('1305'))), 'agriculture')
# roads
.when(col('road_transport')=='road', 'road')
# railroads
.when((col('program1').rlike('^(190102|190103)') | (lower(col('program1')).isin("02 S.Y Adm.Gral.Transporte-Empresa Ferrocarriles Del Estado".lower(),"03 TRANSANTIAGO".lower()))), 'railroads')
# water transport
.when(lower(col('program1')) == '06 D.G.O.P.-Direccion De Obras Portuarias'.lower(), 'water transport')
# air transport
.when(lower(col('program1')) == '07 D.G.O.P.-Direccion De Aeropuertos'.lower(), 'air transport')
# energy
.when((((col('admin1').startswith('17'))&(lower(col('admin2')).isin("04 comision chilena de energia nuclear","05 comision nacional de energia")))|
(col('admin1').startswith('24'))), 'energy')

).withColumn('func',
when(col('admin1').startswith('11'), 'Defence')
.when(col('func_sub').isin('judiciary', 'public safety'), "Public order and safety")
.when((col('interest_tmp')==0) & (
((col('admin1').rlike('^(07|17)') & (~lower(col('admin2')).isin("04 comision chilena de energia nuclear", "05 comision nacional de energia")))) |
col('func_sub').isin('agriculture', 'road', 'railroads', 'water transport', 'air transport', 'energy')
), 'Economic affairs')
.when((
col('admin2').rlike('^(1305|2501|2502|2503)') |
col('admin2').isin('05 Corporacion Nacional Forestal',
'03 Tercer Tribunal Ambiental',
'03 Superintendencia Del Medio Ambiente',
'02 Segundo Tribunal Ambiental',
'02 Servicio De Evaluacion Ambiental',
'01 Subsecretaria De Medio Ambiente',
'01 Primer Tribunal Ambiental')
), 'Environmental protection')
.when(col('admin1').startswith('18'), 'Housing and community amenities')
.when((col('admin1').startswith('16')) & (col('interest_tmp')==0), 'Health')
.when(col('admin1').startswith('09') & (col('interest_tmp')==0), 'Education')
# No recreation and culture spending information
# No infromation on social protection
.otherwise('General public services')
).withColumn('econ_sub',
# social assistance
when(lower(col('econ3')) == '02 prestaciones de asistencia social', 'social assistance')
# pensions
.when(lower(col('econ3')) == '01 prestaciones previsionales', 'pensions')
).withColumn('econ',
# wage bill
when(col('econ2').startswith('21'), 'Wage bill')
# capex
.when(lower(col('econ1'))=='gasto de capital', 'Capital expenditures')
# goods and services
.when(col('econ2').startswith('22'), 'Goods and services')
# subsidies
.when((col('subsidies')=='y') &
(lower(col('econ3')) !="02 prestaciones de asistencia social") &
(lower(col('econ3')) !="01 prestaciones previsionales") &
(lower(col('econ3')) != "03 a otras entidades publicas") &
(lower(col('econ1')) !="gasto de capital"), 'Subsidies')
# social benefits
.when(col('econ_sub').isin('social assistance', 'pensions'), 'Social benefits')
# Other grants and trasnfers
.when((lower(col('econ3'))=='03 a otras entidades publicas') & (lower(col('econ1'))!='gasto de capital'), 'Other grants and transfers')
# interest on debt
.when(lower(col('econ3')).isin("03 intereses deuda interna",
"04 intereses deuda externa",
"05 otros gastos financieros deuda interna",
"06 otros gastos financieros deuda externa"), 'Interest on debt')
.otherwise('Other expenses')
)

mun = dlt.read('chl_boost_bronze_municipal'
).withColumn(
'sheet', lit('Mun')
).withColumn(
'admin0_tmp', lit('Regional')
).withColumn(
'admin1_tmp', col('Region')
).withColumn(
'admin2_tmp', col('Municipio')
).withColumn('geo1', col('Region')
).withColumn('func',
when(lower(col('service'))=='area de salud', 'Health')
.when(trim(lower(col('service')))=='area de educacion', 'Education')
.otherwise('General public services')
).withColumn('econ',
# wage bill
when(col('econ2').startswith('21'), 'Wage bill')
# capex
.when(lower(col('econ1'))=='gastos por actividades de inversion', 'Capital expenditures')
# goods and services
.when(col('econ2').startswith('22'), 'Goods and services')
# subsidies
.when(((lower(col('econ2'))=='24 transferencias corrientes') & (lower(col('econ3')) == '01 al sector privado')), 'Subsidies')
# interest on debt
.when(lower(col('econ3')).isin('03 intereses deuda interna', '05 otros gastos financieros deuda interna'), 'Interest on debt')
.otherwise('Other expenses')
)
return cen.unionByName(mun, allowMissingColumns=True)

@dlt.table(name=f'chl_boost_gold')
def boost_gold():
return (dlt.read(f'chl_boost_silver')
.withColumn('country_name', lit(COUNTRY))
.filter(col('year') > 2008)
.select('country_name',
col('year').cast("integer"),
'approved',
col('modified').alias('revised'),
'executed',
'geo1',
col('admin0_tmp').alias('admin0'),
col('admin1_tmp').alias('admin1'),
'admin2',
'func_sub',
'func',
'econ_sub',
'econ')
)

2 changes: 1 addition & 1 deletion cross_country_aggregate_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType

# Adding a new country requires adding the country here
country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf"]
country_codes = ['moz', 'pry', 'ken', 'pak', 'bfa', 'col', 'cod', 'nga', 'tun', 'btn', 'bgd', 'alb', 'ury', "zaf", 'chl']

schema = StructType([
StructField("country_name", StringType(), True, {'comment': 'The name of the country for which the budget data is recorded (e.g., "Kenya", "Brazil").'}),
Expand Down