1
1
"""Provide an implementation of FASTCC."""
2
2
3
- from typing import TYPE_CHECKING , List , Optional
3
+ from logging import getLogger
4
+ from typing import TYPE_CHECKING , List , Set , Optional
4
5
5
6
from optlang .symbolics import Zero
6
7
10
11
if TYPE_CHECKING :
11
12
from cobra .core import Model , Reaction
12
13
14
+ logger = getLogger (__name__ )
15
+ LARGE_VALUE = 1.0e6
16
+
17
+ def _add_lp7_vars (
18
+ model : "Model" , rxns : List ["Reaction" ], flux_threshold : float
19
+ ) -> None :
20
+ """Add the variables and constraints for the LP.
21
+
22
+ Parameters
23
+ ----------
24
+ model: cobra.Model
25
+ The model to operate on.
26
+ rxns: list of cobra.Reaction
27
+ The reactions to use for LP.
28
+ flux_threshold: float
29
+ The upper threshold an auxiliary variable can have.
30
+
31
+ """
32
+ prob = model .problem
33
+ vars_and_cons = []
34
+
35
+ for rxn in rxns :
36
+ var = prob .Variable (
37
+ "auxiliary_{}" .format (rxn .id ), lb = 0.0 , ub = flux_threshold
38
+ )
39
+ const = prob .Constraint (
40
+ rxn .flux_expression - var ,
41
+ name = "aux_constraint_{}" .format (rxn .id ),
42
+ lb = - LARGE_VALUE ,
43
+ )
44
+ vars_and_cons .extend ([var , const ])
45
+
46
+ model .add_cons_vars (vars_and_cons )
47
+ model .solver .update ()
48
+
13
49
14
50
def _find_sparse_mode (
15
- model : "Model" , rxns : List [ "Reaction" ], flux_threshold : float , zero_cutoff : float
51
+ model : "Model" , rxn_ids : Set [ str ] , zero_cutoff : float
16
52
) -> List ["Reaction" ]:
17
53
"""Perform the LP required for FASTCC.
18
54
@@ -22,8 +58,6 @@ def _find_sparse_mode(
22
58
The model to perform FASTCC on.
23
59
rxns: list of cobra.Reaction
24
60
The reactions to use for LP.
25
- flux_threshold: float
26
- The upper threshold an auxiliary variable can have.
27
61
zero_cutoff: float
28
62
The cutoff below which flux is considered zero.
29
63
@@ -33,36 +67,27 @@ def _find_sparse_mode(
33
67
The list of reactions to consider as consistent.
34
68
35
69
"""
36
- if rxns :
37
- obj_vars = []
38
- vars_and_cons = []
39
- prob = model .problem
40
-
41
- for rxn in rxns :
42
- var = prob .Variable (
43
- "auxiliary_{}" .format (rxn .id ), lb = 0.0 , ub = flux_threshold
44
- )
45
- const = prob .Constraint (
46
- rxn .forward_variable + rxn .reverse_variable - var ,
47
- name = "constraint_{}" .format (rxn .id ),
48
- lb = 0.0 ,
49
- )
50
- vars_and_cons .extend ([var , const ])
51
- obj_vars .append (var )
70
+ if not rxn_ids :
71
+ return set ()
72
+
73
+ # Enable constraints for the reactions
74
+ for rid in rxn_ids :
75
+ model .constraints .get ("aux_constraint_{}" .format (rid )).lb = 0.0
76
+
77
+ obj_vars = [model .variables .get ("auxiliary_{}" .format (rid )) for rid in rxn_ids ]
78
+ model .objective = Zero
79
+ model .objective .set_linear_coefficients ({v : 1.0 for v in obj_vars })
52
80
53
- model .add_cons_vars (vars_and_cons )
54
- model .objective = prob .Objective (Zero , sloppy = True )
55
- model .objective .set_linear_coefficients ({v : 1.0 for v in obj_vars })
81
+ sol = model .optimize (objective_sense = "max" )
56
82
57
- model .optimize (objective_sense = "max" )
58
- result = [rxn for rxn in model .reactions if abs (rxn .flux ) > zero_cutoff ]
59
- else :
60
- result = []
83
+ # Disable constraints for the reactions
84
+ for rid in rxn_ids :
85
+ model .constraints .get ("aux_constraint_{}" .format (rid )).lb = - LARGE_VALUE
61
86
62
- return result
87
+ return set ( sol . fluxes [ sol . fluxes . abs () > zero_cutoff ]. index )
63
88
64
89
65
- def _flip_coefficients (model : "Model" , rxns : List [ "Reaction" ]) -> None :
90
+ def _flip_coefficients (model : "Model" , rxn_ids : Set [ str ]) -> None :
66
91
"""Flip the coefficients for optimizing in reverse direction.
67
92
68
93
Parameters
@@ -73,17 +98,19 @@ def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None:
73
98
The list of reactions whose coefficients will be flipped.
74
99
75
100
"""
101
+ if not rxn_ids :
102
+ return
76
103
# flip reactions
77
- for rxn in rxns :
78
- const = model .constraints .get ("constraint_ {}" .format (rxn . id ))
79
- var = model .variables .get ("auxiliary_{}" .format (rxn . id ))
104
+ for rxn in rxn_ids :
105
+ const = model .constraints .get ("aux_constraint_ {}" .format (rxn ))
106
+ var = model .variables .get ("auxiliary_{}" .format (rxn ))
80
107
coefs = const .get_linear_coefficients (const .variables )
81
108
const .set_linear_coefficients ({k : - v for k , v in coefs .items () if k is not var })
109
+ model .solver .update ()
82
110
83
- # flip objective
84
- objective = model .objective
85
- objective_coefs = objective .get_linear_coefficients (objective .variables )
86
- objective .set_linear_coefficients ({k : - v for k , v in objective_coefs .items ()})
111
+ def _any_set (s ):
112
+ for x in s :
113
+ return set ([x ])
87
114
88
115
89
116
def fastcc (
@@ -133,46 +160,60 @@ def fastcc(
133
160
"""
134
161
zero_cutoff = normalize_cutoff (model , zero_cutoff )
135
162
136
- irreversible_rxns = [rxn for rxn in model .reactions if not rxn .reversibility ]
163
+ all_rxns = {rxn .id for rxn in model .reactions }
164
+ irreversible_rxns = {rxn .id for rxn in model .reactions if not rxn .reversibility }
137
165
rxns_to_check = irreversible_rxns
166
+ flipped = False
167
+ singletons = False
138
168
139
169
with model :
170
+ _add_lp7_vars (model , model .reactions , flux_threshold )
171
+
140
172
rxns_to_keep = _find_sparse_mode (
141
- model , rxns_to_check , flux_threshold , zero_cutoff
173
+ model , rxns_to_check , zero_cutoff
174
+ )
175
+ rxns_to_check = all_rxns .difference (rxns_to_keep )
176
+ logger .info (
177
+ "Initial step found %d consistent reactions. "
178
+ "Starting the consistency loop for the remaining %d reactions." ,
179
+ len (rxns_to_keep ), len (rxns_to_check )
142
180
)
143
181
144
- rxns_to_check = list (set (model .reactions ).difference (rxns_to_keep ))
145
-
146
- while rxns_to_check :
147
- with model :
182
+ while rxns_to_check :
183
+ logger .debug (
184
+ "reactions to check: %d - consistent reactions: %d - flipped: %d - singletons: %d" ,
185
+ len (rxns_to_check ), len (rxns_to_keep ), flipped , singletons
186
+ )
187
+ check = _any_set (rxns_to_check ) if singletons else rxns_to_check
148
188
new_rxns = _find_sparse_mode (
149
- model , rxns_to_check , flux_threshold , zero_cutoff
189
+ model , check , zero_cutoff
150
190
)
151
- rxns_to_keep .extend (new_rxns )
152
-
153
- # this condition will be valid for all but the last iteration
154
- if list (set (rxns_to_check ).intersection (rxns_to_keep )):
155
- rxns_to_check = list (set (rxns_to_check ).difference (rxns_to_keep ))
191
+ rxns_to_keep .update (new_rxns )
156
192
193
+ if rxns_to_check .intersection (rxns_to_keep ):
194
+ rxns_to_check = rxns_to_check .difference (rxns_to_keep )
195
+ flipped = False
157
196
else :
158
- rxns_to_flip = list (set (rxns_to_check ).difference (irreversible_rxns ))
159
- _flip_coefficients (model , rxns_to_flip )
160
- sol = model .optimize (min )
161
- to_add_rxns = sol .fluxes .index [sol .fluxes .abs () > zero_cutoff ].tolist ()
162
- rxns_to_keep .extend (
163
- [model .reactions .get_by_id (rxn ) for rxn in to_add_rxns ]
164
- )
165
- # since this is the last iteration, it needs to break or else
166
- # it will run forever since rxns_to_check won't be empty
167
- break
168
-
169
- consistent_rxns = set (rxns_to_keep )
170
- # need the ids since Reaction objects are created fresh with model.copy()
171
- rxns_to_remove = [
172
- rxn .id for rxn in set (model .reactions ).difference (consistent_rxns )
173
- ]
197
+ check_irr = check .difference (irreversible_rxns )
198
+ if flipped or not check_irr :
199
+ if singletons :
200
+ logger .debug ("%s is inconsistent" , check )
201
+ rxns_to_check = rxns_to_check .difference (check )
202
+ flipped = False
203
+ singletons = True
204
+ else :
205
+ flipped = True
206
+ check = _any_set (rxns_to_check ) if singletons else rxns_to_check
207
+ _flip_coefficients (model , check_irr )
208
+ logger .info (
209
+ "Final - consistent reactions: %d - inconsistent reactions: %d [eps=%.2g, tol=%.2g]" ,
210
+ len (rxns_to_keep ), len (all_rxns ) - len (rxns_to_keep ), flux_threshold , zero_cutoff
211
+ )
174
212
175
213
consistent_model = model .copy ()
176
- consistent_model .remove_reactions (rxns_to_remove , remove_orphans = True )
214
+ consistent_model .remove_reactions (
215
+ all_rxns .difference (rxns_to_keep ),
216
+ remove_orphans = True
217
+ )
177
218
178
219
return consistent_model
0 commit comments