This repository has been archived by the owner on Jan 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
376 lines (306 loc) · 12.7 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import os
import requests
import pandas as pd
import openpyxl
import pickle
from time import sleep
from bs4 import BeautifulSoup
from urllib import parse
from tqdm import tqdm_notebook
from typing import List
from glob import glob
def get_firm_uid(header_uids: dict, name_list: list) -> List[str]:
# use a counter to simulate human operation, every 20 times, we sleep 5 seconds
counter = 0
# define the request url for getting company uid
uid_request_url_list = ["https://www.qichacha.com/search?key={}"\
.format(parse.quote(name)) for name in name_list]
uid_list = []
for url, name in tqdm_notebook(zip(uid_request_url_list, name_list), \
total=len(name_list)):
counter += 1
if counter % 20 == 0:
sleep(5)
counter = 0
response = requests.get(url, headers=header_uids)
soup = BeautifulSoup(response.content, features="lxml")
# checkif the company name is not fully matched, if happens then print
# the name of company
try:
uid_list.append(soup.select_one("#searchlist table.m_srchList "\
"tbody#search-result tr.frtrt "\
"td.checktd label.text-dark-lter "\
"input").get('value'))
except:
print(name)
return uid_list
def get_basic_info_soup(header_basic_info: dict, \
uid_list: list) -> List[BeautifulSoup]:
# use a counter to simulate human operation, every 20 times, we sleep 5 seconds
counter = 0
# generate basic information request url
basic_info_request_url_list = ["https://www.qichacha.com/firm_{}.html"\
.format(uid) for uid in uid_list]
basic_soup_list = []
for url in tqdm_notebook(basic_info_request_url_list):
counter += 1
if counter % 20 == 0:
sleep(5)
counter = 0
response = requests.get(url, headers=header_basic_info)
basic_soup_list.append(BeautifulSoup(response.content, features="lxml"))
return basic_soup_list
def parse_basic_info(basic_soup: BeautifulSoup) -> dict:
info_dict = {}
# a helper function
def _helper_fun_get_key_value_add_to_dict(_row, info_dict):
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip()
info_dict[_key] = _value
_key = _row.find_all('td')[2].text.strip()
_value = _row.find_all('td')[3].text.strip()
info_dict[_key] = _value
return info_dict
#
# 主要信息 - 网站
#
panel = basic_soup.select_one("#company-top")
info_dict['网站'] = panel.find('div', {'class': 'row'})\
.find('div', {'class': 'dcontent'})\
.find("div", {'class': 'row'})\
.find_all('span')[-1]\
.text.strip()\
.split()[0]
#
# 工商信息
#
panel = basic_soup.select_one("#base_div #Cominfo")
table1 = panel.select("table")[0]
table2 = panel.select("table")[1]
# 法定代表人信息
info_dict["法定代表人信息"] = table1.find("h2", {'class': 'seo font-20'}).text
# 注册资本, 实缴资本
_row = table2.find_all('tr')[0]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 经营状态, 成立日期
_row = table2.find_all('tr')[1]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 统一社会信用代码, 纳税人识别号
_row = table2.find_all('tr')[2]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 注册号, 组织机构代码
_row = table2.find_all('tr')[3]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 企业类型, 所属行业
_row = table2.find_all('tr')[4]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 核准日期, 登记机关
_row = table2.find_all('tr')[5]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 所属地区, 英文名
_row = table2.find_all('tr')[6]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 曾用名, 参保人数
_row = table2.find_all('tr')[7]
_key = _row.find_all('td')[0].text.strip()
_value = ', '.join(_row.find_all('td')[1].text.strip().split())
info_dict[_key] = _value
_key = _row.find_all('td')[2].text.strip()
_value = _row.find_all('td')[3].text.strip()
info_dict[_key] = _value
# 人员规模, 营业期限
_row = table2.find_all('tr')[8]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 企业地址
_row = table2.find_all('tr')[9]
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip().split('\n')[0]
info_dict[_key] = _value
# 经营范围
_row = table2.find_all('tr')[10]
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip()
info_dict[_key] = _value
#
# 对外投资
#
panel = basic_soup.select_one("#base_div #touzilist")
# 对外投资数量
if panel == None:
info_dict["对外投资数量"] = 'None'
else:
info_dict["对外投资数量"] \
= panel.select_one("div").find("span", {'class': 'tbadge'}).text
#
# 分支机构
#
panel = basic_soup.select_one("#base_div #branchelist")
# 分支机构数量
if panel == None:
info_dict["分支机构数量"] = 'None'
else:
info_dict["分支机构数量"] \
= panel.select_one("div").find("span", {'class': 'tbadge'}).text
return info_dict
def get_dev_info_soup(header_dev_info: dict, \
company_list: list, \
uid_list: list) -> List[BeautifulSoup]:
# generate header for getting development information
header_dev_info_list = [dict(header_dev_info, \
**{"Referer": "https://www.qichacha.com/firm_{}.html"\
.format(uid)}) for uid in uid_list]
# # generate the request url for getting development information
dev_request_url_list = ["https://www.qichacha.com/company_getinfos?"\
"unique={0}&"\
"companyname={1}&"\
"tab=report".format(uid, parse.quote(name)) \
for uid, name in zip(uid_list, company_list)]
soup_dev_info_list = []
for header, url in tqdm_notebook(zip(header_dev_info_list, dev_request_url_list), \
total=len(header_dev_info_list)):
# simulating human operation
sleep(5)
response = requests.get(url, headers=header)
soup_dev_info_list.append(BeautifulSoup(response.content, features="lxml"))
return soup_dev_info_list
def parse_dev_info(dev_soup: BeautifulSoup) -> dict:
info_dict = {}
# a helper function
def _helper_fun_get_key_value_add_to_dict(_row, info_dict):
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip()
info_dict[_key] = _value
_key = _row.find_all('td')[2].text.strip()
_value = _row.find_all('td')[3].text.strip()
info_dict[_key] = _value
return info_dict
main_panel = dev_soup.find("div", {'id': 0})
#
# 企业基本信息
#
panel = main_panel.find_all('table')[0]
# 注册号, 统一社会信用代码
_row = panel.find_all('tr')[0]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 企业经营状态, 企业联系电话
_row = panel.find_all('tr')[1]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 从业人数, 邮政编码
_row = panel.find_all('tr')[2]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 有限责任公司本年度是否发生股东股权转让, 企业是否有投资信息或购买其他公司股权
_row = panel.find_all('tr')[3]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 电子邮箱
_row = panel.find_all('tr')[4]
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip()
info_dict[_key] = _value
# 企业通讯地址
_row = panel.find_all('tr')[5]
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip().split('\n')[0]
info_dict[_key] = _value
#
# 企业资产状况信息
#
# first we get the index of the table
table_names \
= [name.text.strip().split()[0] \
for name in main_panel.find_all('div')]
table_index = table_names.index("企业资产状况信息")
panel = main_panel.find_all('table')[table_index]
# 资产总额, 所有者权益合计
_row = panel.find_all('tr')[0]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 营业总收入, 利润总额
_row = panel.find_all('tr')[1]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 营业总收入中主营业务收入, 净利润
_row = panel.find_all('tr')[2]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 纳税总额, 负债总额
_row = panel.find_all('tr')[3]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
#
# 社保信息
#
# first we get the index of the table
table_names \
= [name.text.strip().split()[0] \
for name in main_panel.find_all('div')]
# check whether has this table
try:
table_index = table_names.index("社保信息")
panel = main_panel.find_all('table')[table_index]
except:
return info_dict
# 城镇职工基本养老保险, 职工基本医疗保险
_row = panel.find_all('tr')[0]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 生育保险, 失业保险
_row = panel.find_all('tr')[1]
info_dict = _helper_fun_get_key_value_add_to_dict(_row, info_dict)
# 工伤保险
_row = panel.find_all('tr')[2]
_key = _row.find_all('td')[0].text.strip()
_value = _row.find_all('td')[1].text.strip().split('\n')[0]
info_dict[_key] = _value
return info_dict
def fill_excel(path_to_sample_file: str, \
output_dir: str, \
all_df: pd.DataFrame) -> None:
# traversal each company, which is a row in dataframe
total = len(all_df)
for _, row in tqdm_notebook(all_df.iterrows(), total=total):
# get sheet
workbook = openpyxl.load_workbook(path_to_sample_file)
sheet = workbook['sheet1']
#
# 基础信息
#
# 企业名称 C3
sheet['C3'] = row['name']
# 统一社会信用代码 C4
sheet['C4'] = row['统一社会信用代码']
# 法定代表人 C5
sheet['C5'] = row['法定代表人信息']
# 注册资本 C6
sheet['C6'] = row['注册资本']
# 成立日期 C7
sheet['C7'] = row['成立日期']
# 企业经营地址 C8
sheet['C8'] = row['企业地址']
# 公司联系人 C10
sheet['C10'] = row['法定代表人信息']
# 联系电话 C11
sheet['C11'] = row['企业联系电话']
# 企业类型 E3
sheet['E3'] = row['企业类型']
# 主营业务活动 E4
sheet['E4'] = row['经营范围']
# 公司网站 E5
sheet['E5'] = row['网站']
# 企业分支机构名称 E6
sheet['E6'] = row['分支机构数量']
# 分支机构经营地址 E7
sheet['E7'] = row['所属地区']
# 对华投资情况 E10
sheet['E10'] = row['对外投资数量']
# 邮箱 E11
sheet['E11'] = row['电子邮箱']
#
# 经营信息
#
# 从业人数 C14
sheet['C14'] = row['从业人数'] \
if row['从业人数'] != '企业选择不公示' else row['城镇职工基本养老保险']
# 上一年度营业收入 C15
sheet['C15'] = row['营业总收入']
# 上一年度利润总和 C16
sheet['C16'] = row['利润总额']
# 上一年度纳税总额 C17
sheet['C17'] = row['纳税总额']
# save xlsx file
output_file_name = '{0}-招商目标企业信息收集-{1}.xlsx'.format(row['id'], row['name'])
workbook.save(os.path.join(output_dir, output_file_name))