-
Notifications
You must be signed in to change notification settings - Fork 4
/
requirements.py
297 lines (233 loc) · 9.74 KB
/
requirements.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import with_statement, division, absolute_import
import re
import os
import os.path
import time
import functools
import threading
import sublime
import sublime_plugin
import configparser
from urllib.request import Request, urlopen
unicode_type = str
class SimpleCache(object):
"""Dumb cache with TTL"""
def __init__(self):
self._dict = {}
def set(self, key, value, ttl=60):
self._dict[key] = (value, time.time() + ttl)
def get(self, key, default=None):
(value, expire) = self._dict.get(key, (None, 0))
if expire < time.time():
return default
return value
def clear(self):
self._dict.clear()
CACHE = SimpleCache()
def get_pip_index():
"""return url of pypi xmlrpc endpoint"""
settings = sublime.load_settings('requirementstxt.sublime-settings')
pip_index = "https://pypi.python.org/pypi" # xmlrpc
pip_index = os.environ.get("PIP_INDEX", pip_index)
try:
parser = configparser.SafeConfigParser()
parser.read(os.path.expanduser("~/.pip/pip.conf"))
pip_index = parser.get("global", "index")
except configparser.Error:
pass # just ignore
pip_index = settings.get("pip_index", pip_index)
return pip_index
def status_message(msg):
"""Workaround for osx run_on_main_thread problem"""
sublime.set_timeout(functools.partial(sublime.status_message, msg), 0)
def _fetch_packages():
"""Does the actual package list fetch, returns a list of unicode names"""
status_message("requirements.txt: listing packages...")
query = b'''<?xml version='1.0'?>\n<methodCall>\n<methodName>list_packages</methodName>\n<params></params>\n</methodCall>\n'''
req = Request(get_pip_index(), data=query, headers={"Content-Type": "text/xml"})
result = urlopen(req).read().decode("utf-8")
if "<fault>" in result:
packages = []
else:
packages = re.findall("<string>(.+?)</string>", result)
status_message("requirements.txt: got {count}".format(count=len(packages)))
if not isinstance(packages[0], unicode_type):
packages = [pkg.decode("utf-8") for pkg in packages]
pkg_dict = dict(((name.lower(), name) for name in packages))
CACHE.set("--packages--", pkg_dict, ttl=5 * 60)
def list_packages():
"""Return a DICT of lowercase_name -> CaseSensitive_Name of packages
available on get_pip_index() server"""
cached = CACHE.get("--packages--", None)
if cached is not None:
return cached
# thread has 30 seconds to get packages, otherwise cache will
# timeout and next thread will be spawned
CACHE.set("--packages--", {}, ttl=30)
threading.Thread(target=_fetch_packages).start()
return {}
## Yanked from pkg_resources
component_re = re.compile(r'(\d+ | [a-z]+ | \.| -)', re.VERBOSE)
replace = {
'pre': 'c',
'preview': 'c',
'-': 'final-',
'rc': 'c',
'dev': '@'
}
def _parse_version_parts(s):
for part in component_re.split(s):
part = replace.get(part, part)
if part in ['', '.']:
continue
if part[:1] in '0123456789':
yield part.zfill(8) # pad for numeric comparison
else:
yield '*' + part
yield '*final' # ensure that alpha/beta/candidate are before final
def _releases(name, show_hidden=False):
"""Because ServerProxy().package_releases() is soooo f*$& broken
under different ST2/3 + osx/windows/linux configurations"""
template = '''<?xml version='1.0'?>\n<methodCall>\n<methodName>package_releases</methodName>\n<params>\n<param>\n<value><string>{name}</string></value>\n</param>\n<param>\n<value><boolean>{flag}</boolean></value>\n</param>\n</params>\n</methodCall>\n'''
flag = 1 if show_hidden else 0
payload = template.format(name=name, flag=flag).encode("utf-8")
req = Request(get_pip_index(), data=payload, headers={"Content-Type": "text/xml"})
result = urlopen(req).read().decode("utf-8")
if "<fault>" in result:
return []
matches = re.findall("<string>(.+?)</string>", result)
return matches
def releases(name, show_hidden=False):
"""Return sorted list of releases for given package name
If show_hidden is set to true, returns all packages
"""
key = "{name}-{hidden}".format(name=name, hidden=show_hidden)
cached = CACHE.get(key)
if cached:
return cached
rels = _releases(name, show_hidden)
sorted_releases = sorted(rels, key=lambda a: tuple(_parse_version_parts(a)))
CACHE.set(key, sorted_releases, ttl=2 * 60)
return sorted_releases
def requirements_file(view):
"""Return true if given view should be treated as requirements.txt file"""
fname = view.file_name()
if not fname:
return False
basename = os.path.basename(fname)
if basename == "requirements.txt":
return True
dirname = os.path.basename(os.path.dirname(fname))
if dirname == "requirements" and fname.endswith(".txt"):
return True
return False
def requirements_view(view):
return "source.requirementstxt" in view.scope_name(view.sel()[0].begin())
def package_name(line):
"""Parse requirements.txt line and return package name
possibly with extras"""
match = re.match("(.*?)[<=>].*", line)
if not match:
return line
return match.group(1).strip()
def normalized_name(package_line):
"""Reurn lowercase package name and extras (unchanged) or None"""
lower = package_line.lower()
extras_match = re.search(r'\[(.*)\]', package_line)
extras = extras_match.group(1) if extras_match else None
return re.sub(r'\[.*\]', "", lower), extras
def strict_version(version):
"""Return a hard pinned version string"""
return "==" + version
def non_strict_version(version):
"""Where possible, return soft pinned pip version text,
while still keeping package in the current major release line.
If semver parsing fails for any reason, returns soft pinned
version without upper limit"""
try:
next_major = str(int(version.split(".", 1)[0]) + 1)
next_version = ".".join([next_major] + ["0" for _ in version.split(".")[1:]])
except:
return ">=%s" % (version,) # pytz ;-(
else:
return ">=%s,<%s" % (version, next_version)
def selected_lines(view):
"""Iterate over selected lines in given view"""
view.run_command("split_selection_into_lines")
for sel in view.sel():
for line in view.lines(sel):
yield line, view.substr(line)
class RequirementsClearCache(sublime_plugin.WindowCommand):
"""Forced pypi cache clear"""
def run(self):
CACHE.clear()
sublime.status_message("requirements.txt: cache cleared")
class RequirementsAutoVersion(sublime_plugin.TextCommand):
def run(self, edit, strict=False):
if not requirements_view(self.view):
return True
pkg_dict = list_packages()
for line_sel, line in selected_lines(self.view):
lower_pkg_name, extras = normalized_name(package_name(line))
if lower_pkg_name not in pkg_dict:
continue
real_name = pkg_dict[lower_pkg_name]
sorted_releases = releases(real_name)
if extras:
full_name = "{name}[{extras}]".format(name=real_name, extras=extras)
else:
full_name = real_name
version = sorted_releases[-1]
if strict:
version_string = strict_version(version)
else:
version_string = non_strict_version(version)
self.view.replace(edit, line_sel, full_name + version_string)
class RequirementsReplaceLine(sublime_plugin.TextCommand):
def run(self, edit, line_value):
# damn you ST3 ;)
self.view.replace(edit, self.view.line(self.view.sel()[0]), line_value)
class RequirementsPromptVersion(sublime_plugin.TextCommand):
def run(self, edit, strict=False):
if not requirements_view(self.view):
return True
line_sel, line = next(selected_lines(self.view), (None, None))
if not line:
# either no selection or empty line
return
pkg_dict = list_packages()
lower_pkg_name, extras = normalized_name(package_name(line))
if lower_pkg_name not in pkg_dict:
return
real_name = pkg_dict[lower_pkg_name]
versions = list(reversed(releases(real_name, True)))
full_name = real_name
if extras:
full_name += "[{extras}]".format(extras=extras)
ver_func = strict_version if strict else non_strict_version
choices = [full_name + ver_func(version) for version in versions]
callback = functools.partial(self.on_done, choices)
self.view.window().show_quick_panel(choices, callback, 0, 0)
def on_done(self, choices, picked):
if picked == -1:
return
self.view.run_command("requirements_replace_line", {
"line_value": choices[picked]
})
class RequirementsEventListener(sublime_plugin.EventListener):
def on_query_completions(self, view, prefix, locations):
if not requirements_view(view):
return True
pkg_dict = list_packages()
lower_prefix = prefix.lower()
completions = [(pkg, pkg) for lower_name, pkg in pkg_dict.items() if lower_name.startswith(lower_prefix)]
return completions, sublime.INHIBIT_WORD_COMPLETIONS | sublime.INHIBIT_EXPLICIT_COMPLETIONS
def on_load(self, view):
if not requirements_file(view):
return
syntax_file = "Packages/requirementstxt/requirementstxt.tmLanguage"
if hasattr(sublime, "find_resources"):
syntax_file = sublime.find_resources("requirementstxt.tmLanguage")[0]
view.set_syntax_file(syntax_file)