-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathset_calculator.py
146 lines (137 loc) · 6.85 KB
/
set_calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#! /data/users/dqgu/anaconda3/bin/python
import os
import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt
import venn
from upsetplot import from_contents
from upsetplot import plot
def run(files:list, exp=None, out_prefix='result', has_header=False,
intersect_only=True, intersect_xoy=1, union_only=False, show_venn_percent=False,
set_names:list=None, venn_list:list=None, venn_names:list=None, graph_format='png'):
"""
根据文件内容构建集合, 并按指定规则进行运算, 默认计算所有集合的交集
:param files: 当仅提供一个文件时, 文件的各列被当作是集合, 集合的元素是单元格的内容;
提供多个文件时, 每个文件内容被当作一个集合, 集合的元素为一整行。
:param exp: 表达式, 字符串的形式, 如's1-s2'表示第一个集合减去第二个集合, 集合顺序与文件提供的顺序一一对应
:param out_prefix: 指定集合运算结果的文件名前缀
:param has_header: 指定文件是否包含header, 默认无, 如有header, header不参与计算
:param intersect_only: 默认提供, 不考虑exp指定的运算, 而是计算所有集合的交集, 即交集结果的所有元素在集合中出现的频数等于集合数
:param intersect_xoy: 如提供, 不考虑exp指定的运算, 而是计算所有集合的交集, 而且输出交集结果的元素
在所有集合中出现的频数大于或等于该参数指定的阈值.
:param union_only: 计算各个集合的并集
:param show_venn_percent: 如果提供,在venn图中显示百分比
:param set_names: 用于画venn图, 对各个集合进行命名, 与文件名顺序应一致, 默认对文件名进行'.'分割获取第一个字符串作为集合名
:param venn_list: 用于画venn图, 如 'A,B,C' 'B,C,D'表示画两个韦恩图, 第一个韦恩图用ABC集合, 而第二个韦恩图用BCD集合,
默认None, 用所有集合画一个韦恩图; 另外, 可以给该参数输入一个文件, 第一列为集合名, 第二列为分组信息, 后续画图将按照此分组信息分别进行
:param venn_names: 与venn_list一一对应, 用于分别命名venn图文件
:param graph_format: output figure format, default png
:return: None
"""
venn_set_dict = dict()
set_number = len(files)
if len(files) >= 2:
for ind, each in enumerate(files, start=1):
exec('s{}=set(open("{}").readlines())'.format(ind, each))
if set_names is None:
name = os.path.basename(each).rsplit('.', 1)[0]
exec('venn_set_dict["{}"] = s{}'.format(name, ind))
else:
exec('venn_set_dict["{}"] = s{}'.format(set_names[ind - 1], ind))
else:
import pandas as pd
table = pd.read_table(files[0], header=0 if has_header else None)
set_number = table.shape[1]
set_names = table.columns if set_names is None else set_names
for i in range(table.shape[1]):
exec('s{}=set(table.iloc[:, {}].dropna())'.format(i+1, i))
exec('venn_set_dict["{}"] = s{}'.format(set_names[i], i + 1))
result = list()
count_dict = dict()
if exp:
print("do as you say in exp")
result = eval(exp)
elif intersect_xoy > 1:
print('do intersect_xoy')
union = eval('|'.join(['s'+str(x) for x in range(1, set_number+1)]))
result = set()
for each in union:
varspace = dict(locals())
in_times = sum(eval("each in s{}".format(x), varspace) for x in range(1, set_number+1))
if in_times >= intersect_xoy:
result.add(each)
count_dict[each] = in_times
elif union_only:
print('do union only')
result = eval('|'.join(['s'+str(x) for x in range(1, set_number+1)]))
elif intersect_only:
print('do intersect only')
result = eval('&'.join(['s'+str(x) for x in range(1, set_number+1)]))
if not result:
print('result is empty!')
else:
print('result size: {}'.format(len(result)))
with open(out_prefix + '.list', 'w') as f:
if not count_dict:
_ = [f.write(x) for x in result]
else:
data = ([x, count_dict[x]] for x in result)
_ = [f.write(x.strip() + '\t' + str(count_dict[x]) + '\n') for x in result]
if exp:
return
# plot venn
if venn_list is None:
if 2 <= len(venn_set_dict) <= 6:
if show_venn_percent:
venn.venn(venn_set_dict, cmap="tab10", fmt="{size}\n{percentage:.2f}%", fontsize=9)
else:
venn.venn(venn_set_dict, cmap="tab10")
plt.savefig(out_prefix+f'.venn.{graph_format}')
else:
if len(venn_list) == 1 and ',' not in venn_list[0]:
with open(venn_list[0]) as f:
group_dict = dict(x.strip().split()[:2] for x in f)
tmp_dict = dict()
for k, v in group_dict.items():
tmp_dict.setdefault(v, set())
tmp_dict[v].add(k)
venn_list = []
venn_names = []
for k, v in tmp_dict.items():
venn_list.append(','.join(v))
venn_names.append(k)
if venn_names is None:
venn_names = []
for group in venn_list:
venn_names.append(group.replace(',', '-'))
for group, name in zip(venn_list, venn_names):
groups = group.split(',')
tmp_dict = {x: y for x, y in venn_set_dict.items() if x in groups}
if 2 <= len(tmp_dict) <= 6:
if show_venn_percent:
venn.venn(tmp_dict, cmap="tab10", fmt="{size}\n{percentage:.2f}%", fontsize=9)
else:
venn.venn(tmp_dict, cmap="tab10")
out_name = out_prefix + '.{}.venn.{}'.format(name, graph_format)
plt.savefig(out_name, dpi=300)
plt.close()
else:
print('venn for {}?'.format(groups))
print('venn only support 2-6 sets')
# intersection plot
if venn_list is None:
if len(venn_set_dict) <= 8:
plot(from_contents(venn_set_dict), sum_over=False, sort_categories_by=None, show_counts=True)
plt.savefig('{}.upSet.{}'.format(out_prefix, graph_format), dpi=300)
plt.close()
else:
for group, name in zip(venn_list, venn_names):
groups = group.split(',')
tmp_dict = {x: y for x, y in venn_set_dict.items() if x in groups}
if len(tmp_dict) > 1:
plot(from_contents(tmp_dict), sum_over=False, sort_categories_by=None, show_counts=True)
plt.savefig('{}.{}.upSet.{}'.format(out_prefix, name, graph_format), dpi=300)
plt.close()
if __name__ == '__main__':
from xcmds.xcmds import xcmds
xcmds(locals(), include=['run'])