This repository has been archived by the owner on Jan 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamicSpy.py
107 lines (94 loc) · 3.83 KB
/
dynamicSpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: UTF-8 -*-
#引入模块
from bilibili_api import sync, search, settings, video
from collections import OrderedDict
import json
import random
import time
import requests
headers = {'Referer': 'https://www.bilibili.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'}
#settings.proxy = "http://ProxyAddress.cat" # 填写代理地址
#settings.proxy = "http://account:[email protected]" # 如果需要账号以及密码
def wait(sec=1): # “等等陈睿”函数 这里填等待秒或在调用函数时填写 太快会被 -412 拦截
"""
等等陈睿
太快会被陈睿 Gank,特此写函数。
"""
time.sleep(sec)
def getDynamic(keyword, offset=None, printOrNot=False):
"""
传入一个关键词来获取一个动态数据
"""
global headers
if offset == None:
params = {"topic_name": keyword, "sortby": 2}
if printOrNot:
print("正在打印“", keyword, "”。无偏移值")
else:
params = {"topic_name": keyword, "sortby": 2, "offset": offset}
if printOrNot:
print("正在打印“", keyword, "”。偏移值", offset)
getInfo = requests.get(
"http://api.vc.bilibili.com/topic_svr/v1/topic_svr/fetch_dynamics", headers=headers, params=params)
result = getInfo.json()
return result
def getRawData(keyword="", pages=1, printOrNot=False):
offset = None
result = []
for i in range(1,pages+1):
if printOrNot:
print("获取第{}页".format(i))
pendingData = getDynamic(keyword, offset=offset, printOrNot=printOrNot)
data = pendingData["data"]["cards"]
theNextStart = pendingData["data"]["offset"]
offset = theNextStart
result = result + data
if pendingData["data"]["has_more"] == 0:
if printOrNot:
print("nomore")
break
wait()
return result
def makeResultJsonFriendly(keyword="", pages=1, printOrNot=False):
pendingData = getRawData(keyword=keyword, pages=pages, printOrNot=printOrNot)
data = []
for i in pendingData:
cardJson = json.loads(i["card"])
if "item" in cardJson:
if "pictures" in cardJson["item"]:
try:
data.append({"username": cardJson["user"]["name"],
"userid": cardJson["user"]["uid"],
#"content": cardJson["item"]["description"],
"firstPicture": cardJson["item"]["pictures"][0]["img_src"],
"dynamicID": i["desc"]["dynamic_id"],
"dynamicIDStr": str(i["desc"]["dynamic_id"])
})
except KeyError:
pass
else:
pass
return data
def MutiDataSpy(pages=1, printOrNot=False):
pendingData = []
################ 获取关键词列表
KeyWords = [] # 初始化关键词列表
with open('keyWords.txt', 'r', encoding='UTF-8') as f:
elements = f.read().split('\n')[:-1]
for element in elements:
KeyWords.append(element)
f.close()
################ 获取关键词列表
for i in KeyWords:
pendingData = pendingData + makeResultJsonFriendly(
keyword=i, pages=pages, printOrNot=printOrNot)
return pendingData
if __name__ == '__main__':
result = MutiDataSpy(pages=5, printOrNot=True)
#这里第一个参数填写关键词(字符串列表),第二个参数填写是否需要更精确的数据(布尔值),当然也会更慢。
resultJson = open('resultDynamic.json', 'w', encoding="utf-8")
for i in result:
json_i = json.dumps(i)
resultJson.write(json_i+'\n')
resultJson.close()