Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

biosustain / cameo / 963

15 Feb 2016 - 14:59 coverage decreased (-4.3%) to 62.885%
963

Pull #41

travis-ci

Daaf0308d36e60bacd269a6f70b6dd70?size=18&default=identiconphantomas1234
Merge branch 'devel' into structural_analysis_2
Pull Request #41: Second try on implementing the enumeration of shortest elementary flux modes and MCS

431 of 908 new or added lines in 38 files covered. (47.47%)

678 existing lines in 28 files now uncovered.

3143 of 4998 relevant lines covered (62.89%)

1.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.11
/cameo/strain_design/deterministic/linear_programming.py
1
# Copyright 2015 Novo Nordisk Foundation Center for Biosustainability, DTU.
2

3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6

7
# http://www.apache.org/licenses/LICENSE-2.0
8

9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
from __future__ import print_function
2×
16

17
import warnings
2×
18
import numpy
2×
19
import logging
2×
20

21
from cameo.visualization import plotting
2×
22

23
from pandas import DataFrame
2×
24

25
from functools import partial
2×
26

27
from sympy import Add
2×
28

29
from cameo import ui
2×
30
from cameo import config
2×
31

32
from cameo.util import TimeMachine
2×
33

34
from cameo.flux_analysis import flux_balance_impact_degree, phenotypic_phase_plane, flux_variability_analysis
2×
35
from cameo.exceptions import SolveError
2×
36

37
from cameo.core.solver_based_model import SolverBasedModel
2×
38
from cameo.core.solver_based_model_dual import convert_to_dual
2×
39
from cameo.core.reaction import Reaction
2×
40

41
from cameo.strain_design.strain_design import StrainDesignMethod, StrainDesign, StrainDesignResult
2×
42

43
from IProgress.progressbar import ProgressBar
2×
44
from IProgress.widgets import Bar, Percentage
2×
45

46
logger = logging.getLogger(__name__)
2×
47

48

49
__all__ = ["OptKnock"]
2×
50

51

52
class OptKnock(StrainDesignMethod):
2×
53
    """
54
    OptKnock.
55

56
    OptKnock solves a bi-level optimization problem, finding the set of knockouts that allows maximal
57
    target production under optimal growth.
58

59
    Parameters
60
    ----------
61
    model : SolverBasedModel
62
        A model to be used for finding optimal knockouts. Always set a non-zero lower bound on
63
        biomass reaction before using OptKnock.
64
    exclude_reactions : iterable of str or Reaction objects
65
        Reactions that will not be knocked out. Excluding reactions can give more realistic results
66
        and decrease running time. Essential reactions and exchanges are always excluded.
67
    remove_blocked : boolean (default True)
68
        If True, reactions that cannot carry flux (determined by FVA) will be removed from the model.
69
        This reduces running time significantly.
70

71
    Examples
72
    --------
73
    >>> from cameo import models
74
    >>> from cameo.strain_design.deterministic import OptKnock
75
    >>> model = models.bigg.e_coli_core
76
    >>> model.reactions.Biomass_Ecoli_core_w_GAM.lower_bound = 0.1
77
    >>> model.solver = "cplex" # Using cplex is recommended
78
    >>> optknock = OptKnock(model)
79
    >>> result = optknock.run(k=2, target="EX_ac_e", max_results=3)
80
    """
81
    def __init__(self, model, exclude_reactions=None, remove_blocked=True, fraction_of_optimum=None, *args, **kwargs):
2×
82
        assert isinstance(model, SolverBasedModel)
2×
83
        super(OptKnock, self).__init__(*args, **kwargs)
2×
84
        self._model = model.copy()
2×
85
        self._original_model = model
2×
86

87
        if "cplex" in config.solvers:
2×
88
            logger.debug("Changing solver to cplex and tweaking some parameters.")
2×
89
            self._model.solver = "cplex"
2×
90
            problem = self._model.solver.problem
2×
91
            problem.parameters.mip.strategy.startalgorithm.set(1)
2×
92
            problem.parameters.simplex.tolerances.feasibility.set(1e-8)
2×
93
            problem.parameters.simplex.tolerances.optimality.set(1e-8)
2×
94
            problem.parameters.mip.tolerances.integrality.set(1e-8)
2×
95
            problem.parameters.mip.tolerances.absmipgap.set(1e-8)
2×
96
            problem.parameters.mip.tolerances.mipgap.set(1e-8)
2×
97
        else:
UNCOV
98
            warnings.warn("You are trying to run OptKnock with %s. This might not end well." %
!
99
                          self._model.solver.interface.__name__.split(".")[-1])
100

101
        if fraction_of_optimum is not None:
2×
UNCOV
102
            self._model.fix_objective_as_constraint(fraction=fraction_of_optimum)
!
103
        if remove_blocked:
2×
104
            self._remove_blocked_reactions()
2×
105

106
        self._build_problem(exclude_reactions)
2×
107

108
    def _remove_blocked_reactions(self):
2×
109
        fva_res = flux_variability_analysis(self._model, fraction_of_optimum=0)
2×
110
        blocked = [
2×
111
            self._model.reactions.get_by_id(reaction) for reaction, row in fva_res.data_frame.iterrows()
112
            if (round(row["lower_bound"], config.ndecimals) ==
113
                round(row["upper_bound"], config.ndecimals) == 0)
114
        ]
115
        self._model.remove_reactions(blocked)
2×
116

117
    def _build_problem(self, essential_reactions):
2×
118
        logger.debug("Starting to formulate OptKnock problem")
2×
119

120
        self.essential_reactions = self._model.essential_reactions() + self._model.exchanges
2×
121
        if essential_reactions:
2×
UNCOV
122
            for ess_reac in essential_reactions:
!
NEW
123
                if isinstance(ess_reac, Reaction):
!
NEW
124
                    essential_reactions.append(self._model.reactions.get_by_id(ess_reac.id))
!
125
                elif isinstance(essential_reactions, str):
!
126
                    essential_reactions.append(self._model.reactions.get_by_id(ess_reac))
!
127
                else:
128
                    raise TypeError(
!
129
                        "Excluded reactions must be an iterable of reactions or strings. Got object of type " +
130
                        str(type(ess_reac))
131
                    )
UNCOV
132
            self.essential_reactions += essential_reactions
!
133

134
        self._make_dual()
2×
135

136
        self._combine_primal_and_dual()
2×
137
        logger.debug("Primal and dual successfully combined")
2×
138

139
        y_vars = {}
2×
140
        constrained_dual_vars = set()
2×
141
        for reaction in self._model.reactions:
2×
142
            if reaction not in self.essential_reactions and reaction.lower_bound <= 0 <= reaction.upper_bound:
2×
143
                y_var, constrained_vars = self._add_knockout_constraints(reaction)
2×
144
                y_vars[y_var] = reaction
2×
145
                constrained_dual_vars.update(constrained_vars)
2×
146
        self._y_vars = y_vars
2×
147

148
        primal_objective = self._model.solver.objective
2×
149
        dual_objective = self._model.solver.interface.Objective.clone(
2×
150
            self._dual_problem.objective, model=self._model.solver)
151

152
        reduced_expression = Add(*((c * v) for v, c in dual_objective.expression.as_coefficients_dict().items()
2×
153
                                   if v not in constrained_dual_vars))
154
        dual_objective = self._model.solver.interface.Objective(reduced_expression, direction=dual_objective.direction)
2×
155

156
        optimality_constraint = self._model.solver.interface.Constraint(
2×
157
            primal_objective.expression - dual_objective.expression,
158
            lb=0, ub=0, name="inner_optimality")
159
        self._model.solver.add(optimality_constraint)
2×
160
        logger.debug("Inner optimality constrained")
2×
161

162
        logger.debug("Adding constraint for number of knockouts")
2×
163
        knockout_number_constraint = self._model.solver.interface.Constraint(
2×
164
            Add(*y_vars), lb=len(y_vars), ub=len(y_vars)
165
        )
166
        self._model.solver.add(knockout_number_constraint)
2×
167
        self._number_of_knockouts_constraint = knockout_number_constraint
2×
168

169
    def _make_dual(self):
2×
170
        dual_problem = convert_to_dual(self._model.solver)
2×
171
        self._dual_problem = dual_problem
2×
172
        logger.debug("Dual problem successfully created")
2×
173

174
    def _combine_primal_and_dual(self):
2×
175
        primal_problem = self._model.solver
2×
176
        dual_problem = self._dual_problem
2×
177

178
        for var in dual_problem.variables:
2×
179
            var = primal_problem.interface.Variable.clone(var)
2×
180
            primal_problem.add(var)
2×
181
        for const in dual_problem.constraints:
2×
182
            const = primal_problem.interface.Constraint.clone(const, model=primal_problem)
2×
183
            primal_problem.add(const)
2×
184

185
    def _add_knockout_constraints(self, reaction):
2×
186
        interface = self._model.solver.interface
2×
187
        y_var = interface.Variable("y_"+reaction.id, type="binary")
2×
188

189
        self._model.solver.add(interface.Constraint(reaction.flux_expression-1000*y_var, ub=0))
2×
190
        self._model.solver.add(interface.Constraint(reaction.flux_expression+1000*y_var, lb=0))
2×
191

192
        constrained_vars = []
2×
193

194
        if reaction.upper_bound != 0:
2×
195
            dual_forward_ub = self._model.solver.variables["dual_"+reaction.forward_variable.name+"_ub"]
2×
196
            self._model.solver.add(interface.Constraint(dual_forward_ub-1000*(1-y_var), ub=0))
2×
197
            constrained_vars.append(dual_forward_ub)
2×
198
        if reaction.lower_bound != 0:
2×
199
            dual_reverse_ub = self._model.solver.variables["dual_"+reaction.reverse_variable.name+"_ub"]
2×
200
            self._model.solver.add(interface.Constraint(dual_reverse_ub - 1000*(1-y_var), ub=0))
2×
201
            constrained_vars.append(dual_reverse_ub)
2×
202

203
        return y_var, constrained_vars
2×
204

205
    def run(self, max_knockouts=5, biomass=None, target=None, max_results=1, *args, **kwargs):
2×
206
        """
207
        Perform the OptKnock simulation
208

209
        Parameters
210
        ----------
211
        target: str, Metabolite or Reaction
212
            The design target
213
        biomass: str, Metabolite or Reaction
214
            The biomass definition in the model
215
        max_knockouts: int
216
            Max number of knockouts allowed
217
        max_results: int
218
            Max number of different designs to return if found
219

220
        Returns
221
        -------
222
        OptKnockResult
223
        """
224

225
        target = self._model.reaction_for(target, add=False)
2×
226
        biomass = self._model.reaction_for(biomass, add=False)
2×
227

228
        knockout_list = []
2×
229
        fluxes_list = []
2×
230
        production_list = []
2×
231
        biomass_list = []
2×
232
        loader_id = ui.loading()
2×
233
        with TimeMachine() as tm:
2×
234
            self._model.objective = target.id
2×
235
            self._number_of_knockouts_constraint.lb = self._number_of_knockouts_constraint.ub - max_knockouts
2×
236
            count = 0
2×
237
            while count < max_results:
2×
238
                try:
2×
239
                    solution = self._model.solve()
2×
UNCOV
240
                except SolveError as e:
!
UNCOV
241
                    logger.debug("Problem could not be solved. Terminating and returning "+str(count)+" solutions")
!
242
                    logger.debug(str(e))
!
NEW
243
                    break
!
244

245
                knockouts = set(reaction.id for y, reaction in self._y_vars.items() if round(y.primal, 3) == 0)
2×
246
                assert len(knockouts) <= max_knockouts
2×
247

248
                knockout_list.append(knockouts)
2×
249
                fluxes_list.append(solution.fluxes)
2×
250
                production_list.append(solution.f)
2×
251
                biomass_list.append(solution.fluxes[biomass.id])
2×
252

253
                # Add an integer cut
254
                y_vars_to_cut = [y for y in self._y_vars if round(y.primal, 3) == 0]
2×
255
                integer_cut = self._model.solver.interface.Constraint(Add(*y_vars_to_cut),
2×
256
                                                                      lb=1,
257
                                                                      name="integer_cut_"+str(count))
258

259
                if len(knockouts) < max_knockouts:
2×
UNCOV
260
                    self._number_of_knockouts_constraint.lb = self._number_of_knockouts_constraint.ub - len(knockouts)
!
261

262
                tm(do=partial(self._model.solver.add, integer_cut),
2×
263
                   undo=partial(self._model.solver.remove, integer_cut))
264
                count += 1
2×
265

266
            ui.stop_loader(loader_id)
2×
267
            return OptKnockResult(self._original_model, knockout_list, fluxes_list,
2×
268
                                  production_list, biomass_list, target.id, biomass)
269

270

271
class RobustKnock(StrainDesignMethod):
2×
272
    pass
2×
273

274

275
class OptKnockResult(StrainDesignResult):
2×
276
    def __init__(self, model, designs, fluxes, production_fluxes, biomass_fluxes, target, biomass, *args, **kwargs):
2×
277
        super(OptKnockResult, self).__init__(*args, **kwargs)
2×
278
        self._model = model
2×
279
        self._designs = designs
2×
280
        self._fluxes = fluxes
2×
281
        self._production_fluxes = production_fluxes
2×
282
        self._biomass_fluxes = biomass_fluxes
2×
283
        self._target = target
2×
284
        self._biomass = biomass
2×
285
        self._processed_knockouts = None
2×
286

287
    def _process_knockouts(self):
2×
288
        progress = ProgressBar(maxval=len(self._designs), widgets=["Processing solutions: ", Bar(), Percentage()])
2×
289

290
        self._processed_knockouts = DataFrame(columns=["knockouts", "size", "biomass",
2×
291
                                                       self._target, "fva_min", "fva_max"])
292

293
        for i, knockouts in progress(enumerate(self._designs)):
2×
294
            try:
2×
295
                fva = flux_variability_analysis(self._model, fraction_of_optimum=0.99, reactions=[self.target])
2×
296
                self._processed_knockouts.loc[i] = [knockouts, len(knockouts), self.production[i], self.biomass[i],
2×
297
                                                    fva.lower_bound(self.target), fva.upper_bound(self.target)]
NEW
298
            except SolveError:
!
NEW
299
                self._processed_knockouts.loc[i] = [numpy.nan for _ in self._processed_knockouts.columns]
!
300

301
    @property
2×
302
    def knockouts(self):
303
        return self._designs
2×
304

305
    @property
2×
306
    def fluxes(self):
UNCOV
307
        return self._fluxes
!
308

309
    @property
2×
310
    def production(self):
311
        return self._production_fluxes
2×
312

313
    @property
2×
314
    def biomass(self):
315
        return self._biomass_fluxes
2×
316

317
    @property
2×
318
    def target(self):
319
        return self._target
2×
320

321
    def plot(self, index, grid=None, width=None, height=None, title=None, *args, **kwargs):
2×
UNCOV
322
        wt_production = phenotypic_phase_plane(self._model, objective=self._target, variables=[self._biomass.id])
!
UNCOV
323
        with TimeMachine() as tm:
!
UNCOV
324
            for ko in self._designs[index]:
!
NEW
325
                self._model.reactions.get_by_id(ko).knock_out(tm)
!
326

NEW
327
            mt_production = phenotypic_phase_plane(self._model, objective=self._target, variables=[self._biomass.id])
!
NEW
328
        plotting.plot_2_production_envelopes(wt_production.data_frame,
!
329
                                             mt_production.data_frame,
330
                                             self._target,
331
                                             self._biomass.id,
332
                                             **kwargs)
333

334
    @property
2×
335
    def data_frame(self):
336
        if self._processed_knockouts is None:
2×
337
            self._process_knockouts()
2×
338
        data_frame = DataFrame(self._processed_knockouts)
2×
339
        data_frame.sort_values("size", inplace=True)
2×
340
        return data_frame
2×
341

342
    def _repr_html_(self):
2×
NEW
343
        html_string = """
!
344
        <h3>OptKnock:</h3>
345
        <ul>
346
            <li>Target: %s</li>
347
        </ul>
348
        %s""" % (self._target, self.data_frame._repr_html_())
NEW
349
        return html_string
!
350

351
    def __len__(self):
2×
352
        return len(self.knockouts)
2×
353

354
    def __iter__(self):
2×
355
        for knockouts in self.knockouts:
2×
356
            yield StrainDesign(knockouts=knockouts)
2×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2021 Coveralls, Inc