-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconstr-treatment-groups.py
495 lines (359 loc) · 17 KB
/
constr-treatment-groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# Databricks notebook source
# MAGIC %md
# MAGIC # Sample: getting treatment group for each card
# MAGIC
# MAGIC In the sample, we have regular users at some point in Jan 2022 - July 2024:
# MAGIC - 4538 anonymous cards --> this will be excluded from the analysis as in our linked data we do not have purely anonymous cards
# MAGIC - 4172 adulto cards --> this are the Never group. The challenge is that we cannot distinguish
# MAGIC - Who was never eligible and not vulnerable
# MAGIC - Who was never eligible and vulnerable
# MAGIC - Who was eligible at some point and signed up
# MAGIC - 6179 apoyo that paid subsidy at some point in time [WHICH PERIOD]---> get a dataset on having the subsidy each month and each period
# MAGIC - hadlost23: 1, 0, 0
# MAGIC - hadlost24: 1, 1, 0
# MAGIC - hadkept: 1, 1, 1
# MAGIC - gained: 0, 0, 1 or 0, 1, 1
# MAGIC
# MAGIC Replicating the steps for our linked data (https://github.com/dime-worldbank/Colombia-BRT-IE/blob/development/Fare%20Experiment%202022%20Project/Data%20Analysis/DataWork/Master%20Data/Jupyters/gcloud_linked_data_TM/constr_SurveyImpact-treatment-groups.ipynb) as closely as possible.
# MAGIC
# MAGIC
# MAGIC The most important difference is that this analysis can be only done at the **card** level, and not at the ID level.
# MAGIC
# MAGIC
# COMMAND ----------
# MAGIC %md
# MAGIC ## Set up
# COMMAND ----------
# Directories
import os
pathdb = '/mnt/DAP/data/ColombiaProject-TransMilenioRawData/'
path = '/dbfs/' + pathdb
user = os.listdir('/Workspace/Repos')[0]
git = '/Workspace/Repos/' +user+ '/ColombiaTransMilenio/'
git2 = '/Workspace/Repos/' +user+ '/Colombia-BRT_IE-temp/'
## Important sub-directories for this notebook
byheader_dir = path + '/Workspace/Raw/byheader_dir/'
# COMMAND ----------
# choose sample to use
samplesize = "_sample10"
#samplesize = "_sample1"
# COMMAND ----------
# Pip install non-standard packages
!pip install rarfile
!pip install findspark
!pip install pyspark
!pip install plotly
!pip install pyspark_dist_explore
!pip install geopandas
!pip install seaborn
!pip install folium
!pip install editdistance
!pip install scikit-mobility
!pip install chart_studio
!pip install tqdm
!pip install pyunpack
!pip install patool
import shutil
import sys
# MAGIC
%run ./utils/import_test.py
%run ./utils/packages.py
# Functions
import_test_function("Running hola.py works fine :)")
import_test_packages("Running packages.py works fine :)")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Import sample data
# COMMAND ----------
# days with missing data
days_missing = ['2022-09-16', '2022-09-17', '2022-09-18', '2022-09-19',
'2022-09-20', '2023-10-29', '2023-11-26', '2023-12-03',
'2023-12-24', '2023-12-25', '2024-02-03', '2024-02-06',
'2024-02-08', '2024-02-09', '2024-02-26']
# The following include all the values in the data
price_subsidy_18 = [1575, 1725]
price_subsidy_22 = [1650, 1800] # same since Feb 2019
price_subsidy_23 = [2250, 2500] # same for 2024, though since Feb tariff unified to 2500
price_full_17 = [2000]
price_full_18 = [2100, 2300]
price_full_19 = [2200, 2400]
price_full_20 = [2300, 2500] # careful as 2500 is repeated in the subsidy values for 2022
price_full_22 = [2450, 2650]
price_full_23 = [2750, 2950] # same for 2024, though since Feb tariff unified to 2950
# COMMAND ----------
os.listdir('/dbfs/mnt/DAP')
# COMMAND ----------
os.listdir(os.path.join(path,f'Workspace/bogota-hdfs/'))
# COMMAND ----------
df = spark.read.format("parquet").load(os.path.join(pathdb,f'Workspace/bogota-hdfs/df_clean_relevant{samplesize}'))
df.cache()
# COMMAND ----------
# MAGIC %md
# MAGIC ## 1. Get subsidy status per card per month
# MAGIC
# MAGIC 1. Keep all data until month when Follow Up 1 survey finished (March 2024)
# MAGIC
# MAGIC 2. Discard Adulto Mayor and Discapacidad cards (already done because of the way the sample was built)
# MAGIC
# MAGIC 3. Discard transfers
# MAGIC
# MAGIC 4. Tag each validacion as being a subsidy transaction or not
# COMMAND ----------
# Filter dates
falldf = df.filter(F.col("day") < "2024-10-01") # LAST MONTH AVAILABLE
# Correct the year variable
falldf = falldf.withColumn("year", F.year(F.col("year").cast("timestamp")))
# Filter out rows with value <= 200
falldf = falldf.filter(F.col("value") > 200)
falldf = falldf.withColumn(
"subsidy",
F.when( # After 2022
(F.col("value").isin(price_subsidy_22 + price_subsidy_23)) &
(F.col("day") > "2022-01-31"), 1
).when( # After 2022
(F.col("value").isin(price_full_22 + price_full_23)) &
(F.col("day") > "2022-01-31"), 0
).when( # Before 2022
(F.col("value").isin(price_subsidy_18 + price_subsidy_22)) &
(F.col("day") <= "2022-01-31"), 1
).when( # Before 2022
(F.col("value").isin(price_full_17 + price_full_18 + price_full_19 +
price_full_20 + price_full_22)) &
(F.col("day") <= "2022-01-31"), 0
)
)
# mark if they are trips and NOT transfers
falldf = falldf.withColumn("full_trip", (F.col("value") > 500).cast("int"))
falldf = falldf.withColumn(
"value_full_trip",
F.when(F.col("full_trip") == 0, None).otherwise(F.col("value"))
)
# to count all validaciones
falldf = falldf.withColumn("constant_one", F.lit(1))
# COMMAND ----------
# Stats [NOT RUNNING BECAUSE IT TAKES LONG]
# Get the maximum day
max_day = falldf.agg(F.max("day")).collect()[0][0]
print(f"Max day: {max_day}")
# Cards in sample
num_cards = alldf.select("cardnumber").distinct().count()
print(f"Cards in sample: {num_cards}")
# Cards in sample after filter
num_cards_after = falldf.select("cardnumber").distinct().count()
print(f"Cards in sample after filter: {num_cards_after}")
# Total validations
total_validations = falldf.count()
print(f"Total validations: {total_validations}")
# Count null subsidy values
num_null_subsidy = falldf.filter(F.col("subsidy").isNull()).count()
print(f"Number of null subsidy values: {num_null_subsidy}")
# Calculate percentage of null subsidies for anonymous profiles
anonymous_null_percentage = falldf.filter(F.col("profile_final") == "anonymous") \
.filter(F.col("subsidy").isNull()) \
.count() / falldf.filter(F.col("profile_final") == "anonymous").count() * 100
print(f"Percentage of null subsidies for anonymous profiles: {anonymous_null_percentage:.2f}%")
# Show rows with null subsidy values and drop duplicates
falldf.filter(F.col("subsidy").isNull()) \
.select("year", "value", "system", "profile_final") \
.distinct() \
.show()
# COMMAND ----------
# Get number and % of subsidy trips each month
# get total validaciones, total full trips and avg value of full trips by month
dm = falldf.groupBy("cardnumber", "profile_final", "month") \
.agg(
F.mean("subsidy").alias("subsidy_mean"),
F.sum("subsidy").alias("subsidy_sum"),
F.sum("constant_one").alias("n_validaciones"),
F.sum("full_trip").alias("n_trips"),
F.mean("value_full_trip").alias("mean_value_trip")
)
dm = dm.toPandas()
# COMMAND ----------
# subsidy that month rule
dm["subsidy_month"] = 0
dm.loc[dm.subsidy_mean >= 0.4, "subsidy_month" ] = 1 # 40% subsidy trips or more
dm.loc[dm.subsidy_sum == 30, "subsidy_month" ] = 1 # 30 subsidy trips
print(dm.columns)
print(dm.subsidy_month.isnull().sum())
# COMMAND ----------
dm.shape
# COMMAND ----------
dm.month.max()
# COMMAND ----------
dm.to_csv(os.path.join(path, 'Workspace/Construct//monthly-valid-subsidy-bycard'+ samplesize + '.csv'), index=False)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Subsidy per card per month: plots & stats
# MAGIC - Plots of cards with subsidy per month.
# MAGIC - Having subsidy in any month versus card profile profiles
# COMMAND ----------
subsidy_anymonth = dm[(dm.month > "2021-12-31") & (dm.month < "2024-10-01")].groupby("cardnumber", as_index = False).agg({"subsidy_month" : "max"})
subsidy_anymonth.columns = ["cardnumber", "subsidy_anymonth"]
subsidy_anymonth = dm.merge(subsidy_anymonth,
on = "cardnumber",
how = "left")
fig, axes = plt.subplots(nrows=1,ncols=1, figsize = (10, 5))
fig.subplots_adjust(hspace = 0.4)
perc_subsidy = subsidy_anymonth[ (dm.month > "2021-12-31") & (dm.month < "2024-12-01") & (subsidy_anymonth.subsidy_anymonth == 1)].groupby(["month"]).agg({"subsidy_month": lambda x: np.mean(x)*100})
perc_subsidy.subsidy_month.plot(title = f"NON-LINKED DATA \n Percentage of CARDS with subsidy trips each month \n over CARDS travelling that month that had the subsidy ANY month Jan22-Nov24 \n {samplesize}")
plt.xlabel("Month")
plt.ylabel("%")
plt.ylim(0, 100)
plt.grid()
plt.show()
# COMMAND ----------
subsidy_anymonth = subsidy_anymonth[["cardnumber", "profile_final", "subsidy_anymonth"]].drop_duplicates()
print(subsidy_anymonth.shape)
print(subsidy_anymonth.cardnumber.nunique())
pd.crosstab(subsidy_anymonth.profile_final, subsidy_anymonth.subsidy_anymonth)
# COMMAND ----------
# MAGIC %md
# MAGIC ## 2. Having the subsidy each period and final subsidy group
# MAGIC
# MAGIC Output `by_card`: dataset at the card level, long dataset at the card-period level (missing rows if a card is not present that period), stating whether the card have the subsidy each period and some extra info
# COMMAND ----------
dm = pd.read_csv(os.path.join(path, 'Workspace/Construct//monthly-valid-subsidy-bycard'+ samplesize + '.csv'))
# Tag periods
dm["period"] = np.NaN
dm.loc[(dm.month >= "2022-01-01") & (dm.month <= "2023-01-31"), "period"] = 0
dm.loc[(dm.month >= "2023-03-01") & (dm.month <= "2023-12-31"), "period"] = 1
dm.loc[(dm.month >= "2024-01-01") & (dm.month <= "2024-11-31"), "period"] = 2
print(dm.period.value_counts())
# COMMAND ----------
dm.month.max()
# COMMAND ----------
# Tag cards present in each period
tot_cards = dm.cardnumber.nunique()
cards_t0 = set(dm.cardnumber[dm.period == 0])
cards_t1 = set(dm.cardnumber[dm.period == 1])
cards_t2 = set(dm.cardnumber[dm.period == 2])
dm["cards_t0"] = dm.cardnumber.isin(cards_t0)
dm["cards_t1"] = dm.cardnumber.isin(cards_t1)
dm["cards_t2"] = dm.cardnumber.isin(cards_t2)
cards_always = set(cards_t0).intersection(cards_t1).intersection(cards_t2)
cards_after = set(cards_t1).union(cards_t2)
cards_t0_after = cards_after.intersection(cards_t0)
cards_any = cards_after.union(cards_t0)
print( f"Cards present before (in t0): {len(cards_t0)} cards, { round(len(cards_t0) / tot_cards * 100)}%" )
print( f"Cards present after (in either t1 or t2): {len(cards_after)} cards, { round(len(cards_after) / tot_cards * 100)}%" )
print( f"Cards present before and after (in either t1 or t2, and t0): {len(cards_t0_after)} cards, { round(len(cards_t0_after) / tot_cards * 100)}%" )
print( f"Cards present in t1: {len(cards_t1)} cards, { round(len(cards_t1) / tot_cards * 100)}%" )
print( f"Cards present in t2: {len(cards_t2)} cards, { round(len(cards_t2) / tot_cards * 100)}%" )
print( f"Cards present in all three periods: {len(cards_always)}, { round(len(cards_always) / tot_cards * 100)}%" )
print( f"Cards present in any of the periods: {len(cards_any)}, { round(len(cards_any) / tot_cards * 100, 2)}%" )
# COMMAND ----------
# Keep cards in any of the periods
dm_inperiod = dm[dm.period.notnull()].reset_index(drop = True)
# dataset with last month in each period
last = dm_inperiod.groupby(["period","cardnumber"], as_index = False).tail(1)
# Prepare data for corrections
# % of months
check = dm_inperiod.groupby(["period","cardnumber"],
as_index = False).agg({"subsidy_month": ["mean","sum"],
"month": ["nunique", "min", "max"]})
check.columns = ["period", "cardnumber",
"mean_m_subsidy", "sum_m_subsidy",
"n_months", "min_month", "max_month"]
check["perc_m_subsidy"] = check.mean_m_subsidy * 100
# dataset by card and period with all info
by_card_period = last.merge(check,
on = ["period","cardnumber"],
how = "left")
del check, last
by_card_period["subsidy_month_corrected"] = by_card_period.subsidy_month
# CORRECTIONS
## t0: based on % of months with or without subsidy in the period
by_card_period_t0 = by_card_period[by_card_period.period == 0].reset_index(drop = True)
print("% of months with subsidy for those WITH subsidy in the last month:")
print("- People with less than 100%:",
np.sum (by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 1] < 100),
np.mean(by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 1] < 100) * 100)
print("- People with less than 50%:",
np.sum (by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 1] < 50),
np.mean(by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 1] < 50) * 100)
print("---")
print("% of months with subsidy for those WITHOUT subsidy in the last month:")
print("- People with any month:",
round(np.sum (by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 0] > 0)),
round(np.mean(by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 0] > 0) * 100, 2), "%")
print("- People with more than 50%:",
round(np.sum (by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 0] > 50), 2),
round(np.mean(by_card_period_t0.perc_m_subsidy[by_card_period_t0.subsidy_month == 0] > 50)* 100, 2), "%")
# Those without subsidy in the last month before the policy change but more than 50% of months: they will be considered as having the subsidy before the policy change.
by_card_period.loc[ (by_card_period.period == 0) &
(by_card_period.perc_m_subsidy > 50) &
(by_card_period.subsidy_month == 0),
"subsidy_month_corrected" ] = 1
by_card_period = by_card_period[['cardnumber', 'period',
'subsidy_month', 'subsidy_month_corrected',
'sum_m_subsidy', 'mean_m_subsidy', 'perc_m_subsidy']]
pd.crosstab(by_card_period.subsidy_month, by_card_period.subsidy_month_corrected)
# COMMAND ----------
# Reshaping from long to wide
s = by_card_period.pivot(index="cardnumber",
columns="period",
values="subsidy_month_corrected")
s.reset_index(inplace=True)
s.columns.name = None # Remove the name 'period' from columns
s.columns = ["cardnumber", "subsidy_t0", "subsidy_t1", "subsidy_t2"]
s["subsidy_any_period"] = ( (s.subsidy_t0 == 1) | (s.subsidy_t1 == 1) | (s.subsidy_t2 == 1) ) *1
# check we haen't loos cards when pivoting
print("OK:", by_card_period.cardnumber.nunique() == s.cardnumber.nunique())
# Add dataset with some info at the card level
bycard = dm[["cardnumber", "profile_final", "cards_t0", "cards_t1" , "cards_t2"]].drop_duplicates()
print("OK:",bycard.shape[0] == bycard.cardnumber.nunique())
s = s.merge(bycard,
on = "cardnumber",
how = "left")
print(pd.crosstab(s.profile_final, s.cards_t0))
print(pd.crosstab(s.profile_final, s.subsidy_any_period))
del by_card_period, bycard
# COMMAND ----------
# MAGIC %md
# MAGIC We have some anonymous with subsidy! However, we will just use the adulto ones as a comparison. Also, we have 288 apoyo with no subsidy at any period among the relevant ones.
# COMMAND ----------
s.columns
# COMMAND ----------
#hadlost23
s['hadlost23'] = (s.subsidy_t0 == 1) & (s.subsidy_t1 == 0)
s.loc[(s.subsidy_t0 == 1) & (s.subsidy_t1.isnull()) & (s.subsidy_t2 == 0), 'hadlost23'] = 1
#hadlost24
s['hadlost24'] = (s.subsidy_t0 == 1) & (s.subsidy_t1 == 1) & (s.subsidy_t2 == 0)
#hadkept
s['hadkept'] = (s.subsidy_t0 == 1) & (s.subsidy_t1 == 1) & (s.subsidy_t2 == 1)
s.loc[(s.subsidy_t0 == 1) & (s.subsidy_t1 == 1) & (s.subsidy_t2.isnull()), 'hadkept'] = True
s.loc[(s.subsidy_t0 == 1) & (s.subsidy_t1.isnull()) & (s.subsidy_t2 == 1), 'hadkept'] = True
# newly
s['gainedhas'] = (s.subsidy_t0 == 0) & (s.subsidy_t2 == 1)
s.loc[(s.subsidy_t0 == 0) & (s.subsidy_t1 == 1) & (s.subsidy_t2.isnull()), 'gainedhas'] = True
# missings
s['missing_after'] = (s.subsidy_t1.isnull()) & (s.subsidy_t2.isnull())
# to dummies instead of booleans and add a categorical variable
categ = ["hadlost23", "hadlost24", "hadkept", "gainedhas", "missing_after"]
s[categ ] = s[categ ] * 1
# check whether they are mutually exclusive
print(s[categ].sum())
print(s[categ].sum(axis = 1).unique())
# add a categorical variable
s["treatment"] = ""
for v in categ:
s.loc[~ s.cards_t0, v] = 0
s.loc[~ ( (s.cards_t1) |( s.cards_t2)), v] = 0
s.loc[s[v] == 1, "treatment"] = v
s.treatment.value_counts()
# COMMAND ----------
s.drop(columns = categ , inplace = True)
# COMMAND ----------
# no treatment for those anonymous
s.loc[s.profile_final == "anonymous", "treatment"] = "anonymous"
s.loc[s.profile_final == "adulto", "treatment"] = "adulto"
# sample: only cards present before and after
s.loc[~ s.cards_t0, "treatment"] = ""
s.loc[~ ( (s.cards_t1) |( s.cards_t2)), "treatment"] = ""
pd.crosstab(s.profile_final, s.treatment)
# COMMAND ----------
# Was there before policy change or after
s.columns
# COMMAND ----------
s.to_csv(os.path.join(path, 'Workspace/Construct/treatment_groups'+samplesize+'.csv'), index=False)