forked from amitbeka/conda-merge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conda_merge.py
230 lines (179 loc) · 7.38 KB
/
conda_merge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/python3
# encoding: utf-8
"""Tool to merge environment files of the conda package manager.
Given a list of environment files, print a unified environment file.
Usage: conda-merge file1 file2 ... [> unified-environment]
Merge strategy for each part of the definition:
name: keep the last name, if any is given (according to the order of the files).
channels: merge the channel priorities of all files and keep each file's priorities
in the same order. If there is a collision between files, report an error.
dependencies: merge the dependencies and remove duplicates, sorts alphabetically.
conda itself can handle cases like [numpy, numpy=1.7] gracefully so no need
to do that. You may beautify the dependencies by hand if you wish.
The script also doesn't detect collisions, relying on conda to point that out.
"""
import argparse
from collections import OrderedDict, deque
from copy import deepcopy
import sys
import yaml
__version__ = '0.1.4'
class MergeError(Exception):
"""Errors during conda-merge run, mainly failing to merge channels/dependencies"""
pass
def merge_envs(args):
"""Main script entry point.
`args` is a Namescpace object, `args.files` should be a list of file paths
to merge.
No return value, the unified yaml file is printed to stdout.
If an error occurs, a message is printed to stderr and exception is raised.
"""
env_definitions = [read_file(f) for f in args.files]
unified_definition = {}
name = merge_names(env.get('name') for env in env_definitions)
if name:
unified_definition['name'] = name
try:
channels = merge_channels(env.get('channels') for env in env_definitions)
except MergeError as exc:
print("Falied to merge channel priorities.\n{}\n".format(exc.args[0]),
file=sys.stderr)
raise
if channels:
unified_definition['channels'] = channels
deps = merge_dependencies(env.get('dependencies') for env in env_definitions)
if deps:
unified_definition['dependencies'] = deps
# dump the unified environment definition to stdout
yaml.dump(unified_definition, sys.stdout,
indent=2, default_flow_style=False)
def parse_args(argv=None):
"""Parse command line arguments (or user provided ones as list)"""
description = sys.modules[__name__].__doc__
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('files', nargs='+')
return parser.parse_args(argv)
def read_file(path):
with open(path) as f:
return yaml.safe_load(f)
def merge_names(names):
"""Merge names of environments by leaving the last non-blank one"""
actual_names = [name for name in names if name]
if actual_names:
return actual_names[-1]
def merge_channels(channels_list):
"""Merge multiple channel priorities list and output a unified one.
Use a directed-acyclic graph to create a topological sort of the priorities,
so that the order from each environment file will be preserved in the output.
If this cannot be satisfied, a MergeError is raised.
If no channel priories are found (all are None), return an emply list.
"""
dag = DAG()
try:
for channels in channels_list:
if channels is None: # not found in this environment definition
continue
for i, channel in enumerate(channels):
dag.add_node(channel)
if i > 0:
dag.add_edge(channels[i-1], channel)
return dag.topological_sort()
except ValueError as exc:
raise MergeError("Can't satisfy channels priority: {}".format(exc.args[0]))
def merge_dependencies(deps_list):
"""Merge all dependencies to one list and return it.
Two overlapping dependencies (e.g. package-a and package-a=1.0.0) are not
unified, and both are left in the list (except cases of exactly the same
dependency). Conda itself handles that very well so no need to do this ourselves,
unless you want to prettify the output by hand.
"""
only_pips = []
unified_deps = []
for deps in deps_list:
if deps is None: # not found in this environment definition
continue
for dep in deps:
if isinstance(dep, dict) and dep['pip']:
only_pips.append(dep['pip'])
elif dep not in unified_deps:
unified_deps.append(dep)
unified_deps = sorted(unified_deps)
if only_pips:
unified_deps.append(merge_pips(only_pips))
return unified_deps
def merge_pips(pip_list):
"""Merge pip requirements lists the same way as `merge_dependencies` work"""
return {'pip': sorted({req for reqs in pip_list for req in reqs})}
class DAG(object):
"""Directed acyclic graph for merging channel priorities.
This is a stripped down version adopted from:
https://github.com/thieman/py-dag (MIT license)
"""
def __init__(self):
self.graph = OrderedDict()
def __len__(self):
return len(self.graph)
def add_node(self, node_name):
if node_name not in self.graph:
self.graph[node_name] = set()
def add_edge(self, from_node, to_node):
if from_node not in self.graph or to_node not in self.graph:
raise KeyError('one or more nodes do not exist in graph')
test_graph = deepcopy(self.graph)
test_graph[from_node].add(to_node)
if self.validate():
self.graph[from_node].add(to_node)
else:
raise ValueError("{} -> {}".format(from_node, to_node))
@property
def independent_nodes(self):
"""Return a list of all nodes in the graph with no dependencies."""
dependent_nodes = set(node for dependents in self.graph.values()
for node in dependents)
return [node for node in self.graph.keys()
if node not in dependent_nodes]
def validate(self):
"""Return whether the graph doesn't contain a cycle"""
if len(self.independent_nodes) > 0:
try:
self.topological_sort()
return True
except ValueError:
return False
return False
def topological_sort(self):
"""Return a topological ordering of the DAG.
Raise an error if this is not possible (graph is not valid).
"""
in_degree = {}
for node in self.graph:
in_degree[node] = 0
for from_node in self.graph:
for to_node in self.graph[from_node]:
in_degree[to_node] += 1
queue = deque()
for node in in_degree:
if in_degree[node] == 0:
queue.appendleft(node)
sorted_nodes = []
while queue:
independent_node = queue.pop()
sorted_nodes.append(independent_node)
for next_node in self.graph[independent_node]:
in_degree[next_node] -= 1
if in_degree[next_node] == 0:
queue.appendleft(next_node)
if len(sorted_nodes) == len(self.graph):
return sorted_nodes
else:
raise ValueError('graph is not acyclic')
def main():
"""Main entry point for console_scripts of setup.py"""
try:
merge_envs(parse_args())
except MergeError:
return 1
if __name__ == '__main__':
main()