倒排索引的Python实现
+})()倒排索引的Python实现
索引保存了每个词出现的文章编号以及出现的次数
未保存更详细的位置信息等
实现了通过倒排索引检索文档,不过效果上略差于BM25检索
@@ -182,7 +182,7 @@-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 """
coding:UTF-8
author:LemontreeN
date:2022-05-08
"""
import json
import math
import tqdm
from ltp import LTP
ltp = LTP(path="base")# base模型
# ltp = LTP()# small小模型
class PreProcessed:
def __init__(self):
self.stop_words = None
self.cnt = 0
self.index_path = 'data/inverted_index.txt'
self.word_dict = {}
self.cnts = []
def read_stop_words(self, file_path: str):
with open(file_path, 'r', encoding='utf-8') as fp:
self.stop_words = set(fp.read().split('\n'))
def generate_index(self, input_path: str):
progress_read_index = tqdm.tqdm(range(14768), f'建立索引中,目前进度')
with open(input_path, 'r', encoding='utf-8') as js:
for line in js.readlines():
data = json.loads(line)
pid = data.get('pid')
seg_list = data.get('document')
seg = ltp.seg(seg_list)
data['document'] = [' '.join(item) for item in seg[0]]
word_cnt = 0
for item in seg[0]:
word_cnt += len(item)
for word in item:
flag = 0
if word in self.stop_words:
pass
elif word not in self.word_dict:
self.word_dict[word] = []
self.word_dict[word].append([pid, 1])
else:
index_list = self.word_dict[word]
for index in index_list:
if index[0] == pid:
index[1] += 1
flag = 1
if flag == 0:
self.word_dict[word].append([pid, 1])
progress_read_index.update(1)
self.cnts.append(word_cnt)
with open(self.index_path, 'w', encoding='utf-8') as index_output:
for key, value in self.word_dict.items():
index_output.write(str(key) + ';;;')
for i in value:
index_output.write(str(i) + '.')
index_output.write('\n')
with open('data/words.txt', 'w', encoding='utf-8') as fp:
for item in self.cnts:
fp.write('%d\n' % item)
exit(0)
def read_index(self):
progress_read_index = tqdm.tqdm(range(355109), f'读取索引中,目前进度')
with open('data/words.txt', 'r', encoding='utf-8') as fp:
self.cnts = fp.read().split('\n')
with open(self.index_path, 'r', encoding='utf-8') as fp:
for line in fp.readlines():
line = line.split(';;;')# 注意挑选合适的分隔符
if len(line) != 2:
print('error')
pass
word, index = line[0], line[1]
pid_list = index[:-2].split('.')
self.word_dict[word] = pid_list
progress_read_index.update(1)
print('-----***索引读取完毕***-----')
# print('输入查询文本,输入 !quit 退出')
def search(self, conds: str) -> list:
"""
检索TOP3相关文档
:param conds: 查询条件
:return: 可能的文档列表: list
"""
seg, hidden = ltp.seg([conds])
conds = '||'.join(seg[0])
if conds != '!quit':
if '&&' in conds:
conds = conds.split('&&')
pid_list = self.word_dict.get(conds[0])
for i in range(1, len(conds)):
merge_list = self.word_dict.get(conds[i])
temp_list = []
for item in merge_list:
if item in pid_list:
temp_list.append(item)
pid_list = temp_list
elif '||' in conds:
conds = conds.split('||')
pid_list = []
weight = []
for cond in conds:
merge_list = self.word_dict.get(cond)
if merge_list is not None:
df = len(merge_list)
idf = 1 / df
for item in merge_list:
item = item.split(',')
pid = int(item[0][1:])
fre = int(item[1][:-1])
tf=math.log(fre+3)
tf_idf = tf * idf# 参数自己设置
if pid not in pid_list:
pid_list.append(pid)
weight.append(tf_idf)
else:
for i in range(len(pid_list)):
if pid_list[i] == pid:
weight[i] += tf_idf
break
else:
pid_list = self.word_dict.get(conds)
if not pid_list:
print('None!')
return [-1]
else:
pid_weight = sorted([(w, p) for w, p in zip(weight, pid_list)], reverse=True)
print(pid_weight[0])
if len(pid_weight)==1:
return [pid_weight[0][1]]
elif len(pid_weight)==2:
return [pid_weight[0][1], pid_weight[1][1]]
else:
return [pid_weight[0][1],pid_weight[1][1],pid_weight[2][1]]
if __name__ == "__main__":
pre = PreProcessed()
choice = input('\n****Inverted Index IR System****\nChoice:\n1. 读取文件建立索引\n2. 使用已有索引查询')
if choice == '1':
pre.read_stop_words('data/stopwords.txt')
pre.generate_index('data/passages_multi_sentences.json')
elif choice == '2':
pre.read_index()
pre.search('家||中国')
exit(0)本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LemontreeN's!