From 1346c483264790e9cc050955916e999dc5f09717 Mon Sep 17 00:00:00 2001 From: "Pablo R. Mier" Date: Wed, 4 Sep 2024 12:44:58 +0200 Subject: [PATCH] Add carnival v3 for testing purposes on L0 reg with acyclic signals --- corneto/methods/carnival.py | 87 +++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/corneto/methods/carnival.py b/corneto/methods/carnival.py index 4a58a037..4943d500 100644 --- a/corneto/methods/carnival.py +++ b/corneto/methods/carnival.py @@ -483,6 +483,93 @@ def bfs_search( return selected_edges, paths, stats +def create_flow_carnival_v3(G, exp_list, lambd=0.2): + # This is the flow acyclic signal + + At, Ah = get_incidence_matrices_of_edges(G) + interaction = get_interactions(G) + + # Create a single unique flow. The flow selects the + # subset of the PKN that will contain all signal propagations. + P = cn.opt.Flow(G) + + Eact = cn.opt.Variable( + "edge_activates", (G.num_edges, len(exp_list)), vartype=cn.VarType.BINARY + ) + Einh = cn.opt.Variable( + "edge_inhibits", (G.num_edges, len(exp_list)), vartype=cn.VarType.BINARY + ) + # This var should be removed + Z = cn.opt.Variable( + "dummy", (G.num_vertices, len(exp_list)), vartype=cn.VarType.CONTINUOUS + ) + P += Z >= 0 + + # Edge cannot activate and inhibit at the same time + P += Eact + Einh <= 1 + + # The value of a vertex is the difference of the positive and negative incoming edges + Va = At @ Eact + Vi = At @ Einh + V = Va - Vi + P.register("vertex_value", V) + P.register("vertex_inhibited", Vi) + P.register("vertex_activated", Va) + P.register("edge_value", Eact - Einh) + P.register("edge_has_signal", Eact + Einh) + + # Add acyclic constraints on the edge_has_signal (signal) + P = cn.opt.Acyclic(G, P, indicator_positive_var_name="edge_has_signal") + + edges_with_head = np.flatnonzero(np.sum(np.abs(Ah), axis=0) > 0) + + for exp, iexp in zip(exp_list, range(len(exp_list))): + # Edge cannot activate or inhibit downstream vertices if it is not carrying flow + P += Eact[:, iexp] + Einh[:, iexp] <= P.expr.with_flow + + P += Eact[edges_with_head, iexp] <= (Ah.T @ Va)[edges_with_head, iexp].multiply( + interaction[edges_with_head] > 0 + ) + (Ah.T @ Vi)[edges_with_head, iexp].multiply( + interaction[edges_with_head] < 0 + ) # constrain 1B + P += Einh[edges_with_head, iexp] <= (Ah.T @ Va)[edges_with_head, iexp].multiply( + interaction[edges_with_head] < 0 + ) + (Ah.T @ Vi)[edges_with_head, iexp].multiply( + interaction[edges_with_head] > 0 + ) # constrain 2B + + # perturbation: + p_nodes = list(exp_list[exp]["input"].keys()) + p_values = list(exp_list[exp]["input"].values()) + p_nodes_positions = [G.V.index(key) for key in p_nodes] + + P += V[p_nodes_positions, iexp] == p_values + + # measuremenents: + m_nodes = list(exp_list[exp]["output"].keys()) + m_values = np.array(list(exp_list[exp]["output"].values())) + m_nodes_positions = [G.V.index(key) for key in m_nodes] + + # linearization of the ABS function: https://optimization.cbe.cornell.edu/index.php?title=Optimization_with_absolute_values + P += ( + V[m_nodes_positions, iexp] - np.sign(m_values) <= Z[m_nodes_positions, iexp] + ) + P += ( + -V[m_nodes_positions, iexp] + np.sign(m_values) + <= Z[m_nodes_positions, iexp] + ) + + P.add_objectives(sum(Z[m_nodes_positions, iexp].multiply(abs(m_values)))) + if lambd > 0: + P += cn.opt.linear_or( + Eact + Einh, axis=1, varname="edge_has_signal_in_any_sample" + ) + P.add_objectives(sum(P.expr.edge_has_signal_in_any_sample), weights=lambd) + else: + P.add_objectives(0) + return P + + def create_flow_carnival_v2(G, exp_list, lambd=0.2): # This is the flow acyclic signal At, Ah = get_incidence_matrices_of_edges(G)