Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FastCC #1428

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 106 additions & 65 deletions src/cobra/flux_analysis/fastcc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Provide an implementation of FASTCC."""

from typing import TYPE_CHECKING, List, Optional
from logging import getLogger
from typing import TYPE_CHECKING, List, Optional, Set

from optlang.symbolics import Zero

Expand All @@ -10,9 +11,44 @@
if TYPE_CHECKING:
from cobra.core import Model, Reaction

logger = getLogger(__name__)
LARGE_VALUE = 1.0e6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Floating constant; in the matlab implementation this is derived from the solver tolerance (which I don't like that much either), would there be anything against directly adding this as optional argument for the functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is not the tolerance (it's 1e6 not 1e-6). Do deactivate the constraints I just want to open its bound, but optlang does not allow unbounded constraints so I just set a very large (in absolute value) lower bound here to turn off the constraint.


def _add_lp7_vars(
model: "Model", rxns: List["Reaction"], flux_threshold: float
) -> None:
"""Add the variables and constraints for the LP.

Parameters
----------
model: cobra.Model
The model to operate on.
rxns: list of cobra.Reaction
The reactions to use for LP.
flux_threshold: float
The upper threshold an auxiliary variable can have.

"""
prob = model.problem
vars_and_cons = []

for rxn in rxns:
var = prob.Variable(
"auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold
f"auxiliary_{rxn.id}", lb=0.0, ub=flux_threshold

)
const = prob.Constraint(
rxn.flux_expression - var,
name="aux_constraint_{}".format(rxn.id),
lb=-LARGE_VALUE,
)
vars_and_cons.extend([var, const])

model.add_cons_vars(vars_and_cons)
model.solver.update()


def _find_sparse_mode(
model: "Model", rxns: List["Reaction"], flux_threshold: float, zero_cutoff: float
model: "Model", rxn_ids: Set[str], zero_cutoff: float
) -> List["Reaction"]:
"""Perform the LP required for FASTCC.

Expand All @@ -22,8 +58,6 @@ def _find_sparse_mode(
The model to perform FASTCC on.
rxns: list of cobra.Reaction
The reactions to use for LP.
flux_threshold: float
The upper threshold an auxiliary variable can have.
zero_cutoff: float
The cutoff below which flux is considered zero.

Expand All @@ -33,36 +67,27 @@ def _find_sparse_mode(
The list of reactions to consider as consistent.

"""
if rxns:
obj_vars = []
vars_and_cons = []
prob = model.problem

for rxn in rxns:
var = prob.Variable(
"auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold
)
const = prob.Constraint(
rxn.forward_variable + rxn.reverse_variable - var,
name="constraint_{}".format(rxn.id),
lb=0.0,
)
vars_and_cons.extend([var, const])
obj_vars.append(var)
if not rxn_ids:
return set()

# Enable constraints for the reactions
for rid in rxn_ids:
model.constraints.get("aux_constraint_{}".format(rid)).lb = 0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
model.constraints.get("aux_constraint_{}".format(rid)).lb = 0.0
model.constraints.get(f"aux_constraint_{rid}").lb = 0.0


obj_vars = [model.variables.get("auxiliary_{}".format(rid)) for rid in rxn_ids]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
obj_vars = [model.variables.get("auxiliary_{}".format(rid)) for rid in rxn_ids]
obj_vars = [model.variables.get(f"auxiliary_{rid}") for rid in rxn_ids]

model.objective = Zero
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})

model.add_cons_vars(vars_and_cons)
model.objective = prob.Objective(Zero, sloppy=True)
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
sol = model.optimize(objective_sense="max")

model.optimize(objective_sense="max")
result = [rxn for rxn in model.reactions if abs(rxn.flux) > zero_cutoff]
else:
result = []
# Disable constraints for the reactions
for rid in rxn_ids:
model.constraints.get("aux_constraint_{}".format(rid)).lb = -LARGE_VALUE
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optlang includes removal of constraints, as far as I can see, keeping these for later use isn't necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add and remove them in every iteration but this is much slower. Adding constraints means extending the constraint matrix and removing contracting it. Changing a bound however is a constant time operation and very quick.


return result
return set(sol.fluxes[sol.fluxes.abs() > zero_cutoff].index)


def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None:
def _flip_coefficients(model: "Model", rxn_ids: Set[str]) -> None:
"""Flip the coefficients for optimizing in reverse direction.

Parameters
Expand All @@ -73,17 +98,19 @@ def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None:
The list of reactions whose coefficients will be flipped.

"""
if not rxn_ids:
return
# flip reactions
for rxn in rxns:
const = model.constraints.get("constraint_{}".format(rxn.id))
var = model.variables.get("auxiliary_{}".format(rxn.id))
for rxn in rxn_ids:
const = model.constraints.get("aux_constraint_{}".format(rxn))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const = model.constraints.get("aux_constraint_{}".format(rxn))
const = model.constraints.get(f"aux_constraint_{rxn}")

var = model.variables.get("auxiliary_{}".format(rxn))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var = model.variables.get("auxiliary_{}".format(rxn))
var = model.variables.get(f"auxiliary_{rxn}")

coefs = const.get_linear_coefficients(const.variables)
const.set_linear_coefficients({k: -v for k, v in coefs.items() if k is not var})
model.solver.update()

# flip objective
objective = model.objective
objective_coefs = objective.get_linear_coefficients(objective.variables)
objective.set_linear_coefficients({k: -v for k, v in objective_coefs.items()})
def _any_set(s):
for x in s:
return set([x])


def fastcc(
Expand Down Expand Up @@ -133,46 +160,60 @@ def fastcc(
"""
zero_cutoff = normalize_cutoff(model, zero_cutoff)

irreversible_rxns = [rxn for rxn in model.reactions if not rxn.reversibility]
all_rxns = {rxn.id for rxn in model.reactions}
irreversible_rxns = {rxn.id for rxn in model.reactions if not rxn.reversibility}
rxns_to_check = irreversible_rxns
flipped = False
singletons = False

with model:
_add_lp7_vars(model, model.reactions, flux_threshold)

rxns_to_keep = _find_sparse_mode(
model, rxns_to_check, flux_threshold, zero_cutoff
model, rxns_to_check, zero_cutoff
)
rxns_to_check = all_rxns.difference(rxns_to_keep)
logger.info(
"Initial step found %d consistent reactions. "
"Starting the consistency loop for the remaining %d reactions.",
len(rxns_to_keep), len(rxns_to_check)
)

rxns_to_check = list(set(model.reactions).difference(rxns_to_keep))

while rxns_to_check:
with model:
while rxns_to_check:
logger.debug(
"reactions to check: %d - consistent reactions: %d - flipped: %d - singletons: %d",
len(rxns_to_check), len(rxns_to_keep), flipped, singletons
)
check = _any_set(rxns_to_check) if singletons else rxns_to_check
new_rxns = _find_sparse_mode(
model, rxns_to_check, flux_threshold, zero_cutoff
model, check, zero_cutoff
)
rxns_to_keep.extend(new_rxns)

# this condition will be valid for all but the last iteration
if list(set(rxns_to_check).intersection(rxns_to_keep)):
rxns_to_check = list(set(rxns_to_check).difference(rxns_to_keep))
rxns_to_keep.update(new_rxns)

if rxns_to_check.intersection(rxns_to_keep):
rxns_to_check = rxns_to_check.difference(rxns_to_keep)
flipped = False
else:
rxns_to_flip = list(set(rxns_to_check).difference(irreversible_rxns))
_flip_coefficients(model, rxns_to_flip)
sol = model.optimize(min)
to_add_rxns = sol.fluxes.index[sol.fluxes.abs() > zero_cutoff].tolist()
rxns_to_keep.extend(
[model.reactions.get_by_id(rxn) for rxn in to_add_rxns]
)
# since this is the last iteration, it needs to break or else
# it will run forever since rxns_to_check won't be empty
break

consistent_rxns = set(rxns_to_keep)
# need the ids since Reaction objects are created fresh with model.copy()
rxns_to_remove = [
rxn.id for rxn in set(model.reactions).difference(consistent_rxns)
]
check_irr = check.difference(irreversible_rxns)
if flipped or not check_irr:
if singletons:
logger.debug("%s is inconsistent", check)
rxns_to_check = rxns_to_check.difference(check)
flipped = False
singletons = True
else:
flipped = True
check = _any_set(rxns_to_check) if singletons else rxns_to_check
_flip_coefficients(model, check_irr)
logger.info(
"Final - consistent reactions: %d - inconsistent reactions: %d [eps=%.2g, tol=%.2g]",
len(rxns_to_keep), len(all_rxns) - len(rxns_to_keep), flux_threshold, zero_cutoff
)

consistent_model = model.copy()
consistent_model.remove_reactions(rxns_to_remove, remove_orphans=True)
consistent_model.remove_reactions(
all_rxns.difference(rxns_to_keep),
remove_orphans=True
)

return consistent_model
Loading