-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
148 lines (143 loc) · 5.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from lib.preprocessing.openlibrary import (
OL_txt_to_csv,
OL_preprocess_raw_dataset,
OL_join_authors_books,
)
from lib.preprocessing.feltrinelli import (
preprocessing_feltrinelli,
join_fltr_ol,
fltr_RL_ol,
merge_enriched_books,
)
from lib.scraping.scrape import (
scrape_from_feltrinelli,
enr_from_mondadori,
enr_from_hoepli,
)
from lib.utility.lib import load_to_hdfs
from lib.utility.data_analysis import analysis_data
from dask.distributed import Client
import dask.dataframe as dd
import dask.bag as db
import config as cfg
import os
import pandas as pd
import dask
def main(
config,
scrape_feltrinelli=False,
prep_feltrinelli=False,
txt_to_csv_books=False,
txt_to_csv_authors=False,
join_authors_books=False,
enrich_fltr=False,
redefine_fltr=False,
):
"""Handler of the entire pipeline.
Args:
config (config): Object which cointains all the configurations of the project.
scrape_feltrinelli (bool, optional): Set True for scraping books catalog from Feltrinelli. Defaults to False.
txt_to_csv_books (bool, optional): Set True for converting OL books txt to CSV. Defaults to False.
txt_to_csv_authors (bool, optional): Set True for converting OL authors txt to CSV. Defaults to False.
join_authors_books (bool, optional): Set True to enrich OL's books catalog with OL's authors catalog. Defaults to False.
enrich_fltr (bool, optional): Set True to enrich Feltrinelli's catalogue with Open Library, Mondadori and Hoepli's books catalogue
"""
if scrape_feltrinelli:
scrape_from_feltrinelli(
cfg.scraping_fltr["books_for_page"],
cfg.scraping_fltr["timeout"],
cfg.scraping_fltr["path_output"],
cfg.scraping_fltr["name_file_out"],
)
if prep_feltrinelli:
preprocessing_feltrinelli(
cfg.preprocessing_fltr["input_file_path"],
cfg.preprocessing_fltr["path_file_out"],
cfg.preprocessing_fltr["name_file_out"],
)
if txt_to_csv_books:
OL_txt_to_csv(
cfg.books["path_input_books"],
cfg.books["path_splitted_catalog"],
cfg.books["path_full_catalog"],
cfg.books["name_full_books_catalog"],
cfg.books["books_cols"],
cfg.books["dtypes"],
cfg.books["books_chunksize"],
)
if txt_to_csv_authors:
OL_txt_to_csv(
cfg.authors["path_input_authors"],
cfg.authors["path_splitted_catalog"],
cfg.authors["path_full_catalog"],
cfg.authors["name_full_authors_catalog"],
cfg.authors["authors_cols"],
cfg.authors["dtypes"],
cfg.authors["authors_chunksize"],
)
if join_authors_books:
authors_clean = OL_preprocess_raw_dataset(
cfg.prp_raw_authors["raw_ol_authors_path"],
cfg.prp_raw_authors["dtypes_auth"],
)
books_clean = OL_preprocess_raw_dataset(
cfg.prp_raw_books["raw_ol_books_path"],
cfg.prp_raw_books["dtypes_books"],
)
OL_join_authors_books(
books_clean,
authors_clean,
cfg.ol_join["output_file_out"],
cfg.ol_join["name_csv_out"],
cfg.ol_join["join_mode"],
cfg.ol_join["join_key"],
)
if enrich_fltr:
fltr_enr_ol_join, fltr_no_joined = join_fltr_ol(
cfg.enrich_fltr["fltr_cleaned_path"],
cfg.enrich_fltr["ol_joined_auth_book_path"],
cfg.enrich_fltr["fltr_left_enr_path_out"],
cfg.enrich_fltr["fltr_left_enr_name_file_out"],
)
fltr_enr_ol_RL = fltr_RL_ol(
fltr_no_joined, cfg.enrich_fltr["ol_joined_auth_book_path"]
)
fltr_ol_final_enr = merge_enriched_books(
fltr_enr_ol_join,
fltr_enr_ol_RL,
os.path.join(
cfg.preprocessing_fltr["path_file_out"],
cfg.preprocessing_fltr["name_file_out"],
),
)
fltr_enr_mondadori = enr_from_mondadori(
fltr_ol_final_enr,
cfg.enrich_mondadori["path_output"],
cfg.enrich_mondadori["name_file_out"],
)
final_hoepli = enr_from_hoepli(
fltr_enr_mondadori,
cfg.enrich_hoepli["path_output"],
cfg.enrich_hoepli["name_file_out"],
cfg.enrich_hoepli["timeout"],
)
if redefine_fltr:
analysis_data(
cfg.data_analysis["data_set_path"],
cfg.data_analysis["corrected_year_path"],
cfg.data_analysis["corrected_category_path"],
)
if __name__ == "__main__":
# START LOCAL DASK'S CLUSTER, WEB INTERFACE --> http://127.0.0.1:8787/status
client = Client(n_workers=2, threads_per_worker=2, memory_limit="2GB")
print(f"DASK'S CLUSTER INITIALIZE...\n{client}")
main(
cfg,
scrape_feltrinelli=False,
prep_feltrinelli=False,
txt_to_csv_books=False,
txt_to_csv_authors=False,
join_authors_books=False,
enrich_fltr=False,
redefine_fltr=False,
)