-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfile_helper.py
167 lines (125 loc) · 5.21 KB
/
file_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import json
import os
import csv
import time
#a function that takes a response object and reads and saves as html file in outputs folder, also check if the same name exist and add a prefix number end of the file
#get the current path
#a function that takes a 1d list and appends it to a csv file
def list_1d_append_to_csv(data_list, file_path):
with open(file_path, 'a', newline='', encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(data_list)
def save_data_periodically(data,function,wait_sec=10):
"""you should use this with (threading) module to run it in background and save the data periodically"""
while True:
try:
if data:
function(data)
time.sleep(wait_sec)
except Exception as e:
print("error:",e)
def read_csv_file(filename):
data = []
with open(filename, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
headers = next(reader)
for row in reader:
data.append(row)
return headers, data
def get_file_names(directory_path):
file_names = [name for name in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, name))]
return file_names
def read_proxies_from_file(file_path):
proxies = []
with open(file_path, 'r') as file:
for line in file:
proxy_address, proxy_port = line.strip().split(':')
http_proxy = f'{proxy_address}:{proxy_port}'
proxies.append(http_proxy)
return proxies
def save_list_to_csv(data_list, filename):
with open(filename, 'w', newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# Write each sublist as a row in the CSV file
for row in data_list:
writer.writerow(row)
def csv_to_list(file_path, skip_header=False, one_dim=False):
#read the csv file
with open(file_path, 'r', encoding="utf-8") as file:
reader = csv.reader(file)
data_list = []
#loop through each row
for row in reader:
row = row[0] if one_dim else row
data_list.append(row)
return data_list[1:] if skip_header else data_list
def save_dict_to_csv(data_dict, filename):
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['key', 'value']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the header row
writer.writeheader()
# Write each key-value pair as a row in the CSV file
for key, value in data_dict.items():
writer.writerow({'key': key, 'value': value})
def get_html_file_of_url(url,scraper):
response = scraper.get(url)
save_file(response.text, "view", "html", overwrite=True)
def append_list_dict_to_csv(dictionary, file_path):
"""
this function takes a dictionary that has lists as values and append the values as rows in a csv file
dictionary : must be a dictionary that has lists as values, example -> {a:[1,2],b:[4,5]}
"""
with open(file_path, 'a', newline='', encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
if csv_file.tell() == 0:
writer.writerow(dictionary.keys()) # Write the header if the file is empty
# Determine the maximum length of the lists
max_length = max(len(value) for value in dictionary.values())
for i in range(max_length):
row = [dictionary[key][i] if i < len(dictionary[key]) else "" for key in dictionary.keys()]
writer.writerow(row)
def get_file_data(file_path):
with open(file_path, 'r') as file:
data = file.read()
return data
def read_csv_as_dict(file_path):
#read the csv file
with open(file_path, 'r', encoding="utf-8") as file:
reader = csv.reader(file)
#create a dictionary to store the pairs
pairs = {}
#loop through each row
for row in reader:
#add the pair to the dictionary
pairs[row[0]] = row[1]
return pairs
def transform_dict_to_list(key_val_dict):
key_val_list = []
for key,value in key_val_dict.items():
key_val_list.append([key,float(value)])
return key_val_list
def save_file(data, name, extension, path=None, overwrite=False):
#check if the path is None and set the path to current path
if path == None:
path = os.getcwd()
else:
#if path is not exist create the path
if not os.path.exists(path):
os.makedirs(path)
#check path+name+extension is exist, if exist add a prefix number end of the file
full_path = path+'/'+name
i = 1
if not overwrite:
while True:
try:
with open(full_path+'_'+str(i)+'.'+extension, 'r') as f:
i += 1
except:
full_path = full_path+'_'+str(i)+'.'+extension
break
else:
full_path += '.'+extension
#save the fil
with open(full_path, 'w', encoding="utf-8") as f:
f.write(data)