-
-
Notifications
You must be signed in to change notification settings - Fork 126
/
Copy pathapi_test.py
134 lines (112 loc) · 3.73 KB
/
api_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
API测试。
测试方法:
1. 确保路径配置为:ASSETS_PATH=./
2. 删除或重命名当前的数据库assets.db
3. 在项目跟目录执行pytest或直接执行本文件即可进行测试
"""
import time
import pytest
import requests
from utils import get_hash
upload_file = 'test.png'
def read_file(path):
with open(path, 'r', encoding='utf-8') as f:
return f.read()
def wait_server_ready():
for i in range(100):
try:
requests.get('http://127.0.0.1:8085/', timeout=1)
except:
time.sleep(5)
continue
return
print("Server is not ready!")
exit(1)
def setup_function():
wait_server_ready()
def test_index():
# 测试主页
response = requests.get('http://127.0.0.1:8085/')
assert response.status_code == 200
# 由于不同平台的换行不一样,下面的测试可能会报错
# text = response.text
# index_html = read_file("static/index.html")
# assert text == index_html
def test_api_scan():
response = requests.get('http://127.0.0.1:8085/api/scan')
assert response.status_code == 200
data = response.json()
assert data["status"] == "start scanning"
# 马上请求第二次,应该返回正在扫描
response = requests.get('http://127.0.0.1:8085/api/scan')
assert response.status_code == 200
data = response.json()
assert data["status"] == "already scanning"
def test_api_status():
response = requests.get('http://127.0.0.1:8085/api/status')
assert response.status_code == 200
data = response.json()
assert data["status"] is True
# 等待扫描完成
for i in range(100):
response = requests.get('http://127.0.0.1:8085/api/status')
data = response.json()
if data["status"] is not False:
time.sleep(3)
continue
break
assert data["status"] is False
def test_api_clean_cache():
response = requests.get('http://127.0.0.1:8085/api/clean_cache')
assert response.status_code == 204
response = requests.post('http://127.0.0.1:8085/api/clean_cache')
assert response.status_code == 204
def test_api_match():
payload = {
"positive": "white",
"negative": "",
"top_n": "6",
"search_type": 0,
"positive_threshold": 10,
"negative_threshold": 10,
"image_threshold": 85,
"img_id": -1,
"path": "test.png",
"start_time": None,
"end_time": None,
}
# 文字搜图
response = requests.post('http://127.0.0.1:8085/api/match', json=payload)
data = response.json()
assert len(data) == 1
assert data[0]["path"] == "test.png"
assert data[0]["score"] != 0
# 以图搜图
with requests.session() as sess:
# 测试上传图片
files = {'file': ('test.png', open(upload_file, 'rb'), 'image/png')}
response = sess.post('http://127.0.0.1:8085/api/upload', files=files)
assert response.status_code == 200
# 测试以图搜图
payload["search_type"] = 1
response = sess.post('http://127.0.0.1:8085/api/match', json=payload)
data = response.json()
assert len(data) == 1
assert data[0]["path"] == "test.png"
assert data[0]["score"] != 0
# 测试下载图片
response = requests.get('http://127.0.0.1:8085/' + data[0]["url"])
assert response.status_code == 200
with open(upload_file, "rb") as f:
hash_origin = get_hash(f)
hash_download = get_hash(response.content)
assert hash_origin == hash_download
# TODO:以数据库的图搜图和视频
# TODO:文字搜视频
# TODO:以图搜视频
# TODO:get_video
# 运行测试
if __name__ == '__main__':
pytest.main()
# TODO: 测试login和logout