-
Notifications
You must be signed in to change notification settings - Fork 1
/
moss.py
128 lines (107 loc) · 3.92 KB
/
moss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import glob
import socket
from urllib.parse import urlparse
BUFSIZE = 1024
SUPPORTED_LANGUAGES = (
'c',
'cc',
'java',
'ml',
'pascal',
'ada',
'lisp',
'scheme',
'haskell',
'fortran',
'ascii',
'vhdl',
'perl',
'matlab',
'python',
'mips',
'prolog',
'spice',
'vb',
'csharp',
'modula2',
'a8086',
'javascript',
'plsql',
'verilog')
for lang in SUPPORTED_LANGUAGES:
globals()['MOSS_LANG_%s' % lang.upper()] = lang
class MOSSException(Exception):
pass
class MOSS(object):
def __init__(self, user_id, language,
moss_host='moss.stanford.edu',
moss_port=7690,
sensitivity=10, # -m
comment='', # -c
matching_file_limit=250, # -n
use_experimental_server=False, # -x
directory=False):
if language not in SUPPORTED_LANGUAGES:
raise ValueError(
"language '%s' not in %s" % (language, SUPPORTED_LANGUAGES))
self.user_id = user_id
self.language = language
self.moss_host = moss_host
self.moss_port = moss_port
self.sensitivity = sensitivity
self.comment = comment
self.matching_file_limit = matching_file_limit
self.use_experimental_server = use_experimental_server
self.directory = directory
self.staged_files = [[], []]
def add_file_from_disk(self, path, wildcard=False, display_name=None,
**kwargs):
if wildcard and display_name:
raise ValueError(
'wildcard mode incompatible with display_name')
files = glob.glob(path) if wildcard else [path]
for path in files:
with open(path, 'rb') as f:
self.add_file_from_memory(path, f.read(),
display_name=display_name, **kwargs)
def add_file_from_memory(self, virtual_path, content,
base=False, display_name=None):
if display_name is None:
display_name = virtual_path.replace(' ', '_')
self.staged_files[base].append((virtual_path, content, display_name))
def _url_is_valid(self, url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc, result.path])
except ValueError:
return False
def _process_file(self, sock, file_id, path, content, display_name):
sock.sendall(b'file %d %s %d %s\n' % (file_id, self.language.encode('utf-8'),
len(content), display_name.encode('utf-8')))
sock.sendall(content)
def process(self):
sock = socket.socket()
sock.connect((self.moss_host, self.moss_port))
sock.sendall(b'moss %d\n' % self.user_id)
sock.sendall(b'directory %d\n' % int(self.directory))
sock.sendall(b'X %d\n' % int(self.use_experimental_server))
sock.sendall(b'maxmatches %d\n' % self.sensitivity)
sock.sendall(b'show %d\n' % self.matching_file_limit)
sock.sendall(b'language %s\n' % self.language.encode('utf-8'))
resp = sock.recv(BUFSIZE)
if resp.strip() == b'no':
raise MOSSException(
"language '%s' not accepted by server" % self.language)
for path, content, name in self.staged_files[1]:
self._process_file(sock, 0, path, content, name)
for i, (path, content, name) in enumerate(self.staged_files[0]):
self._process_file(sock, i + 1, path, content, name)
sock.sendall(b'query 0 %s\n' % self.comment.encode('utf-8'))
resp = sock.recv(BUFSIZE)
sock.sendall(b'end\n')
sock.close()
url = resp.strip().decode('utf-8')
if not self._url_is_valid(url):
raise MOSSException(
"server returned invalid response URL '%s'" % url)
return url