-
Notifications
You must be signed in to change notification settings - Fork 0
/
min_attack_solver.py
84 lines (72 loc) · 3.39 KB
/
min_attack_solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
'''
Min attack solver:
Optimizing for the Optimal Attack problem with the availabilities fixed
'''
import cplex_interface
import min_cost_flow_solver as mcf
import numpy as np
__author__ = 'jeromethai'
class MinAttackSolver:
# class for computing the min attacks with the availabilities fixed
def __init__(self, network, target_availabilities, cost, full_adj=True, eps=1e-8, cplex=True):
self.network = network
self.a = target_availabilities # availabilities are fixed
self.cost = cost # cost on the rates of attacks
self.phi = network.rates # rates before the attacks
self.delta = network.routing # routing prob. before attacks
self.eps = eps
self.cplex = cplex
self.N = network.size
self.full_adj = full_adj
self.adj = network.full_adjacency if full_adj else network.adjacency
self.check()
def check(self):
# check that the target is the right size and positive
assert self.a.shape[0] == self.N, 'availabilities do not match network size'
assert np.max(self.a) == 1.0, 'availabilities not <= 1.0'
assert np.min(self.a) >= self.eps, 'availabilities not positive'
# check that costs are the right size
assert self.cost.shape[0] == self.N, 'costs do not match network size on 0 axis'
assert self.cost.shape[1] == self.N, 'costs do not match network size on 1 axis'
assert np.min(self.cost) >= self.eps, 'costs not positive'
def min_cost_flow_init(self):
print 'initialize paramaters of the LP ...'
# produce the casting into a min-cost flow problem
# compute source terms
pickup_rates = np.multiply(self.phi, self.a)
sources = pickup_rates - np.dot(self.delta.transpose(), pickup_rates)
assert abs(np.sum(sources)) < self.eps
# compute coefficients in the objective
inverse_target = np.divide(np.ones((self.N,)), self.a)
coeff = np.dot(np.diag(inverse_target), self.cost)
return coeff, sources
def flow_to_rates_routing(self, flow, previous_routing=None):
# convert the flow solution of the min cost flow problem
# back into rates
# makes sure that the diagonal is equal to zero
N = self.N
flow[range(N), range(N)] = 0.0
opt_rates = np.divide(np.sum(flow, 1), self.a)
# convert the flow into routing
tmp = np.multiply(opt_rates, self.a)
zeroes = np.where(tmp < self.eps)[0]
tmp[zeroes] = np.sum(self.adj[zeroes,:], axis=1)
#adj = self.adj
flow[zeroes, :] = self.adj[zeroes,:]
flow[range(N), range(N)] = 0.0
inverse_tmp = np.divide(np.ones((N,)), tmp)
opt_routing = np.dot(np.diag(inverse_tmp), flow)
opt_rates[opt_rates < 0.0] = 0.0
opt_routing[opt_routing < 0.0] = 0.
#import pdb; pdb.set_trace()
return opt_rates, opt_routing
def solve(self):
print 'start min_attack_solver ...'
# initialize the parameters for the min-cost-flow problem
# it returns that optimal rates and routing probabilities
coeff, sources = self.min_cost_flow_init()
# runs the min-cost-flow problem
# solver = mcf.cplex_solver if self.cplex else mcf.solver
# flow = solver(coeff, sources, self.network.adjacency)
flow = mcf.cplex_solver(coeff, sources, self.adj)
return self.flow_to_rates_routing(flow)