-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathloader.py
294 lines (245 loc) · 14.3 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import polars as pl
import easygui
import logging as log
from geopy.geocoders import Nominatim
import os.path
from typing import Dict, List, Tuple
from io import StringIO
# Nastavení logování NOTE: Log se ukládá do loader_log.txt
log.basicConfig(
filename="loader_log.txt",
encoding="utf-8",
filemode="w",
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
level=log.INFO
)
def fetch_csv(service:str = "",ticket:str = "", params_plus:dict = {}, auth:Tuple[str, str] = None) -> 'StringIO': # manual_login formát: (jméno, heslo)
import requests
assert service != "", "Service is necessary"
url = "https://ws.ujep.cz/ws/services/rest2" + service
params = {
"outputFormat":"CSV",
"outputFormatEncoding":"utf-8"
}
params.update(params_plus)
cookies = {}
if ticket != "":
cookies.update({"WSCOOKIE":ticket})
data = requests.get(url, params=params, cookies=cookies, auth=auth)
wrap = StringIO(data.text)
return wrap
# Funkce vracící slovník ve formátu {jméno v načítané tabulce:jméno v ukládané tabulce}
def getColumnTrans() -> Dict[str, str]:
return {
#"ciziSkolaNazev":"Univerzita",
"Erasmus kód":"ERASMUS CODE",
"Město":"Město",
"Stá":"Stát",
"kodyIscedUvedeneUDomacichPodmSml":"Obor",
"Interní název smlouvy (Erasmus kód + katedry)":"Fullname"
}
# Funkce sjednoducíjící jména načítané a ukláadané tabulky
def unite_cols(new_schools:pl.DataFrame, name_gen:pl.DataFrame) -> pl.DataFrame:
column_translator:Dict[str, str] = getColumnTrans()
shady_stuff:List[str] = ["Univerzita"] + list(column_translator.values())
return new_schools.drop("Název univerzity").rename(column_translator).join(name_gen, "ERASMUS CODE", "left").select(shady_stuff)
# Funkce která získá katedry
def extract_dptmnts(new_schools:pl.DataFrame, faculty:str) -> pl.DataFrame:
#test = new_schools.get_column("Fullname").str.strip_chars().str.split(",").slice("").list.join(",")
#log.info(test.to_list())
#log.info(new_schools.get_column("Fullname").to_list())
dptmnts = pl.read_csv(fetch_csv(service="/ciselniky/getSeznamPracovist", params_plus={
"typPracoviste":"K",
"zkratka":"%",
"nadrazenePracoviste":faculty
}), separator=";")
log.info(dptmnts.head())
dptmnts = dptmnts.filter(pl.col("nazev").str.contains("[Kk]atedra")).get_column("zkratka").to_list()
dpts_string = ", ".join(dptmnts)
new_schools = new_schools.filter(pl.col("Fullname") != "STORNO").with_columns(pl.col("Fullname").str.replace_all(" ", "").str.split(",").list.slice(offset=1).alias("Katedry")).drop("Fullname")
log.info(new_schools.head())
dpts = new_schools.explode("Katedry").lazy().group_by("ERASMUS CODE").agg(pl.col("Katedry")).collect()
new_schools = new_schools.unique("ERASMUS CODE").drop("Katedry").join(dpts, "ERASMUS CODE", "left").with_columns(pl.col("Katedry").list.join(", ").name.keep())
#return new_schools.with_columns(pl.col("Katedry").list.join(", ").name.keep())
return new_schools.with_columns(pl.when(pl.col("Katedry") == "všechnykatedry").then(pl.lit(dpts_string)).otherwise(pl.col("Katedry")).name.keep())
# Funkce která předělává čísla oborů na jména
# DEPRECATED
def rename_subs(new_schools:pl.DataFrame) -> pl.DataFrame:
# Tabulka se jmény oborů
code_trans = pl.read_excel("cz_isced_f_systematicka_cast.xlsx")
code_trans = code_trans.with_columns(
pl.col("Název").str.replace_all(r"(?i)– obory [dj]\. n\.", "").str.strip_chars_end().alias("Based") # Hranatá závorka je regulérní výraz: V podstatě to znamená "Pokud najdeš jedno písmeno z množiny"
).drop("Název").rename({"Based":"Název"})
# Rozepiš obory
new_schools = new_schools.with_columns(pl.col("Obor").str.split(", ").alias("Obor2")).drop("Obor").rename({"Obor2":"Obor"})
new_schools_temp = new_schools.explode("Obor").join(code_trans, "Obor", how="left").rename({"Název":"Obory"}).unique()
# Agreguj ČÍSLA oborů, a nalep je na tabulku s explodovanými obory
obor_rename = new_schools_temp.lazy().group_by("ERASMUS CODE").agg(pl.col("Obor")).collect().with_columns(pl.col("Obor").list.join(", ").name.keep())#.drop("Název")
new_schools_temp = new_schools_temp.drop("Obor").join(obor_rename, "ERASMUS CODE", "left")
# Vrať zpět pracovní tabulku s názvy oborů
log.info(new_schools_temp.columns)
return new_schools_temp
def rename_subs_hard(new_schools:pl.DataFrame, old_schools:pl.DataFrame) -> pl.DataFrame:
temp_schools = new_schools.select("ERASMUS CODE", "Obor").with_columns(
pl.col("Obor").str.split(", ").name.keep()
).explode("Obor").lazy().group_by("ERASMUS CODE").agg(pl.col("Obor")).collect().with_columns(
pl.col("Obor").list.join(", ").alias("Obor_neu")
).drop("Obor").unique("ERASMUS CODE")
old_schools = old_schools.select("ERASMUS CODE", "Obor").unique("ERASMUS CODE")
# Bear with me
# Vezmi obory z new_schools a old_schools, spoj je a vyřaď duplicitní hodnoty
mediator:pl.DataFrame = temp_schools.join(old_schools, "ERASMUS CODE", "left").with_columns(
pl.concat_str([pl.col("Obor_neu"), pl.col("Obor")], separator=", ", ignore_nulls=True).str.split(", ").list.unique().alias("Kódy oborů")
).drop("Obor_neu", "Obor")
# Zduplikuj obory, druhý exploduj
mediator = mediator.rename({"Kódy oborů":"Obor"}).with_columns(pl.col("Obor").list.join(", ").alias("Kódy oborů")).explode("Obor")
# Získej tabulku jmen oborů, a očisti data (zbav se "- obory d. n." a variant toho, zbav se white space)
code_trans = pl.read_excel("cz_isced_f_systematicka_cast.xlsx")
code_trans = code_trans.with_columns(
pl.col("Název").str.replace_all(r"(?i)– obory [dj]\. n\.", "").str.strip_chars_end().name.keep()#.alias("Based") # Hranatá závorka je regulérní výraz: V podstatě to znamená "Pokud najdeš jedno písmeno z množiny"
)#.drop("Název").rename({"Based":"Název"})
# Spoj všechno dohromady (jména oborů -> mediátor, obory a jména oborů (mediátor) -> new_schools)
mediator = mediator.join(code_trans, "Obor", "left").drop("Obor").rename({"Název":"Obory"})
return new_schools.unique("ERASMUS CODE").join(mediator, "ERASMUS CODE", "left").drop("Obor").rename({"Kódy oborů":"Obor"})
def rozdel_katedry(new_schools: pl.DataFrame) -> pl.DataFrame:
if "Katedry" not in new_schools.columns:
raise ValueError("Sloupec 'Katedry' nebyl nalezen v tabulce.")
upravena_tabulka = (
new_schools
.with_columns(
new_schools["Katedry"]
.str.replace(" ", "")
.str.split(","))
.explode("Katedry"))
return upravena_tabulka
# Funkce která přidává url
def get_url(new_schools:pl.DataFrame, url_source:pl.DataFrame) -> pl.DataFrame:
return new_schools.join(url_source, "ERASMUS CODE", "left").with_columns(
pl.when( # Potenciálně obsolete: Dá se nastavit aby se sloupeček zobrazoval jako hypertexty ve visualizeru
pl.col("Website Url").str.starts_with("http").not_()
).then(
"https://" + pl.col("Website Url")
).otherwise(
pl.col("Website Url")
).alias("URL")
).drop("Website Url")#.rename({"Website Url":"URL"})
# Funkce která přidává geografické koordinace (aka zdroj všeho zla)
def get_coords(new_schools:pl.DataFrame, address_source:pl.DataFrame) -> pl.DataFrame:
print("Získávám geokoordinace. Prosím, počkejte chvíli, tohle bude trvat.")
# Mějme jen kódy škol a adresy
address_source = address_source.join(new_schools.select("ERASMUS CODE", "Univerzita"), "ERASMUS CODE", "inner").unique("ERASMUS CODE")
#address_source.write_excel("addresses.xlsx")
# Vytvoření clienta geolokátoru
geolocator = Nominatim(user_agent="[email protected]") # NOTE: Tohle používá můj osobní účet. To není uplně ideální (jinak řečeno, this fuckin sucks).
# Získání koordinací
# Kolo 1
names = address_source.to_series(address_source.get_column_index("Univerzita")).to_list()
unis = address_source.to_series(address_source.get_column_index("ERASMUS CODE")).to_list()
loc_dicts = {uni:name for uni,name in zip(unis, names)}
relocations = {uni:geolocator.geocode(loc_dicts[uni]) for uni in unis}
# Kolo 2: Spravení (některých) None hodnot
fixes = [uni for uni in unis if relocations[uni] == None]
locations = {uni:loc for uni,loc in zip(unis, address_source.to_series(address_source.get_column_index("Address")).to_list())} #NOTE: This makes me cry
loc_dicts = {uni:{"street":locations[uni][0], "city":locations[uni][1], "country":locations[uni][2]} for uni in fixes}
for uni in fixes:
relocations[uni] = geolocator.geocode(loc_dicts[uni])
# Přidání koordinací do df
df_maker = {"ERASMUS CODE":[], "Longitude":[],"Latitude":[]}
for loc in relocations.keys():
df_maker["ERASMUS CODE"].append(loc)
df_maker["Longitude"].append(str(relocations[loc].longitude) if relocations[loc] != None else None)
df_maker["Latitude"].append(str(relocations[loc].latitude) if relocations[loc] != None else None)
# reloc_info = f"{relocations[loc]} ({relocations[loc].latitude}, {relocations[loc].longitude})" if relocations[loc] != None else None
#log.info(f"{loc} - {reloc_info}")
return new_schools.join(pl.from_dict(df_maker), "ERASMUS CODE", "left")
def table_overwriter(excel_file, fac) -> int: # Funkce zapíše všechno do souboru a následně vrátí počet řádků s nevalidními koordinacemi
# Načítání
current_schools = pl.DataFrame()
if not os.path.exists("schools.xlsx"):
current_schools = pl.from_dict({
"ERASMUS CODE":pl.Series(dtype=pl.String),
"Fakulta":pl.Series(dtype=pl.String),
"Katedry":pl.Series(dtype=pl.String),
"Univerzita":pl.Series(dtype=pl.String),
"Město":pl.Series(dtype=pl.String),
"Stát":pl.Series(dtype=pl.String),
"Longitude":pl.Series(dtype=pl.String),
"Latitude":pl.Series(dtype=pl.String),
"URL":pl.Series(dtype=pl.String),
"Obor":pl.Series(dtype=pl.String),
"Obory":pl.Series(dtype=pl.String)})
else:
current_schools = pl.read_excel("schools.xlsx")
new_schools = pl.read_excel(excel_file)
addresses = pl.read_excel("url_gen.xlsx").rename({"Erasms Code":"ERASMUS CODE", "Legal Name":"Univerzita"}).with_columns(pl.concat_list(pl.col("Street"), pl.col("City"), pl.col("Country Cd")).alias("Address")).select("ERASMUS CODE", "Univerzita", "Website Url", "Address")
log.info("Tables read successfully.")
# Sjednocení existujících sloupců
new_schools = unite_cols(new_schools, addresses.select("ERASMUS CODE", "Univerzita"))
log.info("Columns united.")
log.info(new_schools.filter((pl.col("ERASMUS CODE") == "BG ROUSSE01") | (pl.col("ERASMUS CODE") == "E JAEN01")).head())
# Přidání kateder
new_schools = extract_dptmnts(new_schools, fac)
log.info("Departments extracted.")
log.info(new_schools.filter((pl.col("ERASMUS CODE") == "BG ROUSSE01") | (pl.col("ERASMUS CODE") == "E JAEN01")).head(15))
# Přejmenování oborů
#rename_subs(new_schools=new_schools).write_excel("schools_legacy.xlsx")
new_schools = rename_subs_hard(new_schools, current_schools)
log.info("Renamed cols.")
log.info(new_schools.filter((pl.col("ERASMUS CODE") == "BG ROUSSE01") | (pl.col("ERASMUS CODE") == "E JAEN01")).head())
# Získání url
new_schools = get_url(new_schools, addresses.select("ERASMUS CODE", "Website Url"))
log.info("Fetched url.")
log.info(new_schools.head())
# Získání geokoordinací
new_schools = get_coords(new_schools, addresses.select("ERASMUS CODE", "Address"))
log.info("Fetched geocoords.")
log.info(new_schools.head())
log.info(new_schools.columns)
# Mergování a zápis
#current_schools = current_schools.drop("Longitude", "Latitude")
new_schools = new_schools.insert_column(1, pl.Series("Fakulta",[fac] * new_schools.__len__(),dtype=pl.String))
#TODO: Přidat funkcionalitu která mergene fakulty pro stejné školy
new_schools = new_schools.select(current_schools.columns)
current_schools.join(new_schools, "ERASMUS CODE", "anti").vstack(new_schools, in_place=True).write_excel("schools.xlsx")
log.info("Done!")
#return len(new_schools.filter(pl.col("Longitude").is_null() | pl.col("Latitude").is_null()))
return 0
# ------
def parseLines(excel_file:any) -> List[str]:
log.info("Starting file parsing by ascertaining file input type.")
if type(excel_file) != str:
try:
excel_file = excel_file.name
except:
log.info("Type determination failed at input type determination. Throwing error.")
raise ValueError("File has been inputted via an illegal method.")
log.info(excel_file)
match excel_file.split(".")[-1]:
case "xlsx":
log.info("File is an excel file.")
buffer = pl.read_excel(excel_file)
return buffer.to_series(buffer.get_column_index("ERASMUS CODE")).to_list()
case "txt":
log.info("File is a text file.")
with open(excel_file, "r") as txtfile:
return txtfile.read().split('\n')
case _:
log.info("Uh oh.")
raise ValueError("File is of an unreadable format.")
def table_eraser(excel_file) -> int:
if not os.path.exists("schools.xlsx"):
raise FileNotFoundError("First, make sure to have some schools.")
log.info("Starting eraser.")
lines:List[str] = parseLines(excel_file)
current_schools = pl.read_excel("schools.xlsx")
curLen = len(current_schools)
current_schools = current_schools.filter(pl.col("ERASMUS CODE").is_in(lines).not_())
current_schools.write_excel("schools.xlsx")
log.info("Eraser finished correctly.")
return curLen - len(current_schools)
if __name__ == "__main__":
faculty = easygui.enterbox("Zadejte fakultu pod kterou přidat")
table_overwriter(easygui.fileopenbox("Vyberte soubor s novými školami: ", "Hi", filetypes="*.xlsx"), fac=faculty)
#table_eraser(easygui.fileopenbox("Vyberte soubor obsahující školy k vymazání.", filetypes=["*.xlsx", "*.txt"]))