• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 17683053131

12 Sep 2025 06:35PM UTC coverage: 86.155% (+0.8%) from 85.357%
17683053131

Pull #415

github

quaquel
some more unit tests and associated fixes
Pull Request #415: Evaluator overhaul

119 of 120 new or added lines in 11 files covered. (99.17%)

221 existing lines in 3 files now uncovered.

7511 of 8718 relevant lines covered (86.16%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.8
/ema_workbench/em_framework/optimization.py
1
"""Wrapper around platypus-opt."""
2

3
import copy
1✔
4
import functools
1✔
5
import os
1✔
6
import random
1✔
7
import shutil
1✔
8
import tarfile
1✔
9
import warnings
1✔
10

11
import numpy as np
1✔
12
import pandas as pd
1✔
13

14
from ..util import INFO, EMAError, get_module_logger, temporary_filter
1✔
15
from . import callbacks, evaluators
1✔
16
from .outcomes import AbstractOutcome
1✔
17
from .parameters import (
1✔
18
    BooleanParameter,
19
    CategoricalParameter,
20
    IntegerParameter,
21
    RealParameter,
22
)
23
from .points import Policy, Scenario
1✔
24
from .samplers import determine_parameters
1✔
25
from .util import ProgressTrackingMixIn, determine_objects
1✔
26

27
try:
1✔
28
    import platypus
1✔
29
    from platypus import (
1✔
30
        NSGAII,
31
        PCX,
32
        PM,
33
        SBX,
34
        SPX,
35
        UM,
36
        UNDX,
37
        DifferentialEvolution,
38
        EpsilonBoxArchive,
39
        EpsilonIndicator,
40
        EpsilonProgressContinuation,
41
        EpsNSGAII,
42
        GAOperator,
43
        GenerationalDistance,
44
        Hypervolume,
45
        Integer,
46
        InvertedGenerationalDistance,
47
        Multimethod,
48
        RandomGenerator,
49
        Real,
50
        Solution,
51
        Spacing,
52
        Subset,
53
        TournamentSelector,
54
        Variator,
55
    )  # @UnresolvedImport
56
    from platypus import Problem as PlatypusProblem
1✔
57

58

59
except ImportError:
×
UNCOV
60
    warnings.warn(
×
61
        "Platypus based optimization not available. Install with `pip install platypus-opt`",
62
        ImportWarning, stacklevel=2
63
    )
64

UNCOV
65
    class PlatypusProblem:
×
UNCOV
66
        constraints = []
×
67

UNCOV
68
        def __init__(self, *args, **kwargs):
×
69
            pass
×
70

UNCOV
71
    class Variator:
×
72
        def __init__(self, *args, **kwargs):
×
73
            pass
×
74

75
    class RandomGenerator:
×
76
        def __call__(self, *args, **kwargs):
×
77
            pass
×
78

79
    class TournamentSelector:
×
80
        def __init__(self, *args, **kwargs):
×
81
            pass
×
82

83
        def __call__(self, *args, **kwargs):
×
84
            pass
×
85

UNCOV
86
    class EpsilonProgressContinuation:
×
87
        pass
×
88

UNCOV
89
    EpsNSGAII = None
×
90
    platypus = None
×
91
    Real = Integer = Subset = None
×
92

93
# Created on 5 Jun 2017
94
#
95
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
96

97
__all__ = [
1✔
98
    "ArchiveLogger",
99
    "Convergence",
100
    "EpsilonIndicatorMetric",
101
    "EpsilonProgress",
102
    "GenerationalDistanceMetric",
103
    "HypervolumeMetric",
104
    "InvertedGenerationalDistanceMetric",
105
    "OperatorProbabilities",
106
    "Problem",
107
    "RobustProblem",
108
    "SpacingMetric",
109
    "epsilon_nondominated",
110
    "rebuild_platypus_population",
111
    "to_problem",
112
    "to_robust_problem",
113
]
114
_logger = get_module_logger(__name__)
1✔
115

116

117
class Problem(PlatypusProblem):
1✔
118
    """Small extension to Platypus problem object.
119

120
    Includes information on the names of the decision variables, the names of the outcomes,
121
    and the type of search.
122
    """
123

124
    @property
1✔
125
    def parameter_names(self):
1✔
126
        """Getter for parameter names."""
UNCOV
127
        return [e.name for e in self.parameters]
×
128

129
    def __init__(
1✔
130
        self, searchover, parameters, outcome_names, constraints, reference=None
131
    ):
132
        """Init."""
133
        if constraints is None:
1✔
134
            constraints = []
1✔
135

136
        super().__init__(len(parameters), len(outcome_names), nconstrs=len(constraints))
1✔
137
        #         assert len(parameters) == len(parameter_names)
138
        assert searchover in ("levers", "uncertainties", "robust")
1✔
139

140
        if searchover == "levers":
1✔
141
            assert not reference or isinstance(reference, Scenario)
1✔
142
        elif searchover == "uncertainties":
1✔
143
            assert not reference or isinstance(reference, Policy)
1✔
144
        else:
145
            assert not reference
1✔
146

147
        self.searchover = searchover
1✔
148
        self.parameters = parameters
1✔
149
        self.outcome_names = outcome_names
1✔
150
        self.ema_constraints = constraints
1✔
151
        self.constraint_names = [c.name for c in constraints]
1✔
152
        self.reference = reference if reference else 0
1✔
153

154

155
class RobustProblem(Problem):
1✔
156
    """Small extension to Problem object for robust optimization.
157

158
    adds the scenarios and the robustness functions
159
    """
160

161
    def __init__(
1✔
162
        self, parameters, outcome_names, scenarios, robustness_functions, constraints
163
    ):
164
        """Init."""
165
        super().__init__("robust", parameters, outcome_names, constraints)
1✔
166
        assert len(robustness_functions) == len(outcome_names)
1✔
167
        self.scenarios = scenarios
1✔
168
        self.robustness_functions = robustness_functions
1✔
169

170

171
def to_problem(model, searchover, reference=None, constraints=None):
1✔
172
    """Helper function to create Problem object.
173

174
    Parameters
175
    ----------
176
    model : AbstractModel instance
177
    searchover : str
178
    reference : Policy or Scenario instance, optional
179
                overwrite the default scenario in case of searching over
180
                levers, or default policy in case of searching over
181
                uncertainties
182
    constraints : list, optional
183

184
    Returns
185
    -------
186
    Problem instance
187

188
    """
189
    # extract the levers and the outcomes
190
    decision_variables = determine_parameters(model, searchover, union=True)
1✔
191

192
    outcomes = determine_objects(model, "outcomes")
1✔
193
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
194
    outcome_names = [outcome.name for outcome in outcomes]
1✔
195

196
    if not outcomes:
1✔
UNCOV
197
        raise EMAError(
×
198
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
199
        )
200

201
    problem = Problem(
1✔
202
        searchover, decision_variables, outcome_names, constraints, reference=reference
203
    )
204
    problem.types[:] = to_platypus_types(decision_variables)
1✔
205
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
206
    problem.constraints[:] = "==0"
1✔
207

208
    return problem
1✔
209

210

211
def to_robust_problem(model, scenarios, robustness_functions, constraints=None):
1✔
212
    """Helper function to create RobustProblem object.
213

214
    Parameters
215
    ----------
216
    model : AbstractModel instance
217
    scenarios : collection
218
    robustness_functions : iterable of ScalarOutcomes
219
    constraints : list, optional
220

221
    Returns
222
    -------
223
    RobustProblem instance
224

225
    """
226
    # extract the levers and the outcomes
227
    decision_variables = determine_parameters(model, "levers", union=True)
1✔
228

229
    outcomes = robustness_functions
1✔
230
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
231
    outcome_names = [outcome.name for outcome in outcomes]
1✔
232

233
    if not outcomes:
1✔
UNCOV
234
        raise EMAError(
×
235
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
236
        )
237

238
    problem = RobustProblem(
1✔
239
        decision_variables, outcome_names, scenarios, robustness_functions, constraints
240
    )
241

242
    problem.types[:] = to_platypus_types(decision_variables)
1✔
243
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
244
    problem.constraints[:] = "==0"
1✔
245

246
    return problem
1✔
247

248

249
def to_platypus_types(decision_variables):
1✔
250
    """Helper function for mapping from workbench parameter types to platypus parameter types."""
251
    # TODO:: should categorical not be platypus.Subset, with size == 1?
252
    _type_mapping = {
1✔
253
        RealParameter: platypus.Real,
254
        IntegerParameter: platypus.Integer,
255
        CategoricalParameter: platypus.Subset,
256
        BooleanParameter: platypus.Subset,
257
    }
258

259
    types = []
1✔
260
    for dv in decision_variables:
1✔
261
        klass = _type_mapping[type(dv)]
1✔
262

263
        if not isinstance(dv, (CategoricalParameter | BooleanParameter)):
1✔
264
            decision_variable = klass(dv.lower_bound, dv.upper_bound)
1✔
265
        else:
266
            decision_variable = klass(dv.categories, 1)
1✔
267

268
        types.append(decision_variable)
1✔
269
    return types
1✔
270

271

272
def to_dataframe(solutions, dvnames, outcome_names):
1✔
273
    """Helper function to turn a collection of platypus Solution instances into a pandas DataFrame.
274

275
    Parameters
276
    ----------
277
    solutions : collection of Solution instances
278
    dvnames : list of str
279
    outcome_names : list of str
280

281
    Returns
282
    -------
283
    pandas DataFrame
284
    """
285
    results = []
1✔
286
    for solution in platypus.unique(solutions):
1✔
287
        vars = transform_variables(
1✔
288
            solution.problem, solution.variables
289
        )  # @ReservedAssignment
290

291
        decision_vars = dict(zip(dvnames, vars))
1✔
292
        decision_out = dict(zip(outcome_names, solution.objectives))
1✔
293

294
        result = decision_vars.copy()
1✔
295
        result.update(decision_out)
1✔
296

297
        results.append(result)
1✔
298

299
    results = pd.DataFrame(results, columns=dvnames + outcome_names)
1✔
300
    return results
1✔
301

302

303
def process_uncertainties(jobs):
1✔
304
    """Helper function to map jobs generated by platypus to Scenario objects.
305

306
    Parameters
307
    ----------
308
    jobs : collection
309

310
    Returns
311
    -------
312
    scenarios, policies
313

314
    """
UNCOV
315
    problem = jobs[0].solution.problem
×
UNCOV
316
    scenarios = []
×
317

UNCOV
318
    jobs = _process(jobs, problem)
×
319
    for i, job in enumerate(jobs):
×
320
        name = str(i)
×
UNCOV
321
        scenario = Scenario(name=name, **job)
×
322
        scenarios.append(scenario)
×
323

324
    policies = problem.reference
×
325

326
    return scenarios, policies
×
327

328

329
def process_levers(jobs):
1✔
330
    """Helper function to map jobs generated by platypus to Policy objects.
331

332
    Parameters
333
    ----------
334
    jobs : collection
335

336
    Returns
337
    -------
338
    scenarios, policies
339

340
    """
UNCOV
341
    problem = jobs[0].solution.problem
×
UNCOV
342
    policies = []
×
UNCOV
343
    jobs = _process(jobs, problem)
×
UNCOV
344
    for i, job in enumerate(jobs):
×
345
        name = str(i)
×
346
        policy = Policy(name=name, **job)
×
347
        policies.append(policy)
×
348

349
    scenarios = problem.reference
×
350

351
    return scenarios, policies
×
352

353

354
def _process(jobs, problem):
1✔
355
    """Helper function to transform platypus job to dict with correct values for workbench."""
UNCOV
356
    processed_jobs = []
×
UNCOV
357
    for job in jobs:
×
UNCOV
358
        variables = transform_variables(problem, job.solution.variables)
×
UNCOV
359
        processed_job = {}
×
360
        for param, var in zip(problem.parameters, variables):
×
361
            try:
×
362
                var = var.value # noqa: PLW2901
×
363
            except AttributeError:
×
364
                pass
×
365
            processed_job[param.name] = var
×
366
        processed_jobs.append(processed_job)
×
367
    return processed_jobs
×
368

369

370
def process_robust(jobs):
1✔
371
    """Helper function to process robust optimization jobs.
372

373
    Parameters
374
    ----------
375
    jobs : collection
376

377
    Returns
378
    -------
379
    scenarios, policies
380

381
    """
UNCOV
382
    _, policies = process_levers(jobs)
×
UNCOV
383
    scenarios = jobs[0].solution.problem.scenarios
×
384

UNCOV
385
    return scenarios, policies
×
386

387

388
def transform_variables(problem, variables):
1✔
389
    """Helper function for transforming platypus variables."""
390
    converted_vars = []
1✔
391
    for type, var in zip(problem.types, variables):  # @ReservedAssignment
1✔
392
        var = type.decode(var) # noqa: PLW2901
1✔
393
        try:
1✔
394
            var = var[0] # noqa: PLW2901
1✔
395
        except TypeError:
1✔
396
            pass
1✔
397

398
        converted_vars.append(var)
1✔
399
    return converted_vars
1✔
400

401

402
def evaluate(jobs_collection, experiments, outcomes, problem):
1✔
403
    """Helper function for mapping the results from perform_experiments back to what platypus needs."""
UNCOV
404
    searchover = problem.searchover
×
UNCOV
405
    outcome_names = problem.outcome_names
×
UNCOV
406
    constraints = problem.ema_constraints
×
407

408
    column = "policy" if searchover == "levers" else "scenario"
×
409

410
    for entry, job in jobs_collection:
×
UNCOV
411
        logical = experiments[column] == entry.name
×
412

UNCOV
413
        job_outputs = {}
×
414
        for k, v in outcomes.items():
×
415
            job_outputs[k] = v[logical][0]
×
416

417
        # TODO:: only retain uncertainties
418
        job_experiment = experiments[logical]
×
419
        job_constraints = _evaluate_constraints(
×
420
            job_experiment, job_outputs, constraints
421
        )
422
        job_outcomes = [job_outputs[key] for key in outcome_names]
×
423

UNCOV
424
        if job_constraints:
×
UNCOV
425
            job.solution.problem.function = lambda _: (job_outcomes, job_constraints) # noqa: B023
×
426
        else:
UNCOV
427
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
428
        job.solution.evaluate()
×
429

430

431
def evaluate_robust(jobs_collection, experiments, outcomes, problem):
1✔
432
    """Helper function for mapping the results from perform_experiments back to what Platypus needs."""
UNCOV
433
    robustness_functions = problem.robustness_functions
×
UNCOV
434
    constraints = problem.ema_constraints
×
435

UNCOV
436
    for entry, job in jobs_collection:
×
437
        logical = experiments["policy"] == entry.name
×
438

UNCOV
439
        job_outcomes_dict = {}
×
440
        job_outcomes = []
×
441
        for rf in robustness_functions:
×
UNCOV
442
            data = [outcomes[var_name][logical] for var_name in rf.variable_name]
×
443
            score = rf.function(*data)
×
444
            job_outcomes_dict[rf.name] = score
×
445
            job_outcomes.append(score)
×
446

447
        # TODO:: only retain levers
448
        job_experiment = experiments[logical].iloc[0]
×
449
        job_constraints = _evaluate_constraints(
×
450
            job_experiment, job_outcomes_dict, constraints
451
        )
452

453
        if job_constraints:
×
UNCOV
454
            job.solution.problem.function = lambda _: (job_outcomes, job_constraints)  # noqa: B023
×
455
        else:
UNCOV
456
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
457

458
        job.solution.evaluate()
×
459

460

461
def _evaluate_constraints(job_experiment, job_outcomes, constraints):
1✔
462
    """Helper function for evaluating the constraints for a given job."""
UNCOV
463
    job_constraints = []
×
UNCOV
464
    for constraint in constraints:
×
UNCOV
465
        data = [job_experiment[var] for var in constraint.parameter_names]
×
UNCOV
466
        data += [job_outcomes[var] for var in constraint.outcome_names]
×
467
        constraint_value = constraint.process(data)
×
468
        job_constraints.append(constraint_value)
×
469
    return job_constraints
×
470

471

472
class AbstractConvergenceMetric:
1✔
473
    """Base convergence metric class."""
474

475
    def __init__(self, name):
1✔
476
        """Init."""
UNCOV
477
        super().__init__()
×
UNCOV
478
        self.name = name
×
UNCOV
479
        self.results = []
×
480

481
    def __call__(self, optimizer):
1✔
482
        raise NotImplementedError
×
483

484
    def reset(self):
1✔
485
        self.results = []
×
486

487
    def get_results(self):
1✔
UNCOV
488
        return self.results
×
489

490

491
class EpsilonProgress(AbstractConvergenceMetric):
1✔
492
    """Epsilon progress convergence metric class."""
493

494
    def __init__(self):
1✔
495
        """Init."""
UNCOV
496
        super().__init__("epsilon_progress")
×
497

498
    def __call__(self, optimizer): # noqa: D102
1✔
UNCOV
499
        self.results.append(optimizer.archive.improvements)
×
500

501

502
class MetricWrapper:
1✔
503
    """Wrapper class for wrapping platypus indicators.
504

505
    Parameters
506
    ----------
507
    reference_set : DataFrame
508
    problem : PlatypusProblem instance
509
    kwargs : dict
510
             any additional keyword arguments to be passed
511
             on to the wrapper platypus indicator class
512

513
    Notes
514
    -----
515
    this class relies on multi-inheritance and careful consideration
516
    of the MRO to conveniently wrap the convergence metrics provided
517
    by platypus.
518

519
    """
520

521
    def __init__(self, reference_set, problem, **kwargs):
1✔
UNCOV
522
        self.problem = problem
×
UNCOV
523
        reference_set = rebuild_platypus_population(reference_set, self.problem)
×
UNCOV
524
        super().__init__(reference_set=reference_set, **kwargs)
×
525

526
    def calculate(self, archive):
1✔
527
        solutions = rebuild_platypus_population(archive, self.problem)
×
528
        return super().calculate(solutions)
×
529

530

531
class HypervolumeMetric(MetricWrapper, Hypervolume):
1✔
532
    """Hypervolume metric.
533

534
    Parameters
535
    ----------
536
    reference_set : DataFrame
537
    problem : PlatypusProblem instance
538

539
    this is a thin wrapper around Hypervolume as provided
540
    by platypus to make it easier to use in conjunction with the
541
    workbench.
542

543
    """
544

545

546
class GenerationalDistanceMetric(MetricWrapper, GenerationalDistance):
1✔
547
    """GenerationalDistance metric.
548

549
    Parameters
550
    ----------
551
    reference_set : DataFrame
552
    problem : PlatypusProblem instance
553
    d : int, default=1
554
        the power in the intergenerational distance function
555

556

557
    This is a thin wrapper around GenerationalDistance as provided
558
    by platypus to make it easier to use in conjunction with the
559
    workbench.
560

561
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
562
    for more information
563

564
    """
565

566

567
class InvertedGenerationalDistanceMetric(MetricWrapper, InvertedGenerationalDistance):
1✔
568
    """InvertedGenerationalDistance metric.
569

570
    Parameters
571
    ----------
572
    reference_set : DataFrame
573
    problem : PlatypusProblem instance
574
    d : int, default=1
575
        the power in the inverted intergenerational distance function
576

577

578
    This is a thin wrapper around InvertedGenerationalDistance as provided
579
    by platypus to make it easier to use in conjunction with the
580
    workbench.
581

582
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
583
    for more information
584

585
    """
586

587

588
class EpsilonIndicatorMetric(MetricWrapper, EpsilonIndicator):
1✔
589
    """EpsilonIndicator metric.
590

591
    Parameters
592
    ----------
593
    reference_set : DataFrame
594
    problem : PlatypusProblem instance
595

596

597
    this is a thin wrapper around EpsilonIndicator as provided
598
    by platypus to make it easier to use in conjunction with the
599
    workbench.
600

601
    """
602

603

604
class SpacingMetric(MetricWrapper, Spacing):
1✔
605
    """Spacing metric.
606

607
    Parameters
608
    ----------
609
    problem : PlatypusProblem instance
610

611

612
    this is a thin wrapper around Spacing as provided
613
    by platypus to make it easier to use in conjunction with the
614
    workbench.
615

616
    """
617

618
    def __init__(self, problem):
1✔
UNCOV
619
        self.problem = problem
×
620

621

622
class HyperVolume(AbstractConvergenceMetric):
1✔
623
    """Hypervolume convergence metric class.
624

625
    This metric is derived from a hyper-volume measure, which describes the
626
    multi-dimensional volume of space contained within the pareto front. When
627
    computed with minimum and maximums, it describes the ratio of dominated
628
    outcomes to all possible outcomes in the extent of the space.  Getting this
629
    number to be high or low is not necessarily important, as not all outcomes
630
    within the min-max range will be feasible.  But, having the hypervolume remain
631
    fairly stable over multiple generations of the evolutionary algorithm provides
632
    an indicator of convergence.
633

634
    Parameters
635
    ----------
636
    minimum : numpy array
637
    maximum : numpy array
638

639

640
    This class is deprecated and will be removed in version 3.0 of the EMAworkbench.
641
    Use ArchiveLogger instead and calculate hypervolume in post using HypervolumeMetric
642
    as also shown in the directed search tutorial.
643

644
    """
645

646
    def __init__(self, minimum, maximum):
1✔
UNCOV
647
        super().__init__("hypervolume")
×
UNCOV
648
        warnings.warn(
×
649
            "HyperVolume is deprecated and will be removed in version 3.0 of the EMAworkbench."
650
            "Use ArchiveLogger and HypervolumeMetric instead",
651
            DeprecationWarning, stacklevel=2
652
        )
653
        self.hypervolume_func = Hypervolume(minimum=minimum, maximum=maximum)
×
654

655
    def __call__(self, optimizer):
1✔
UNCOV
656
        self.results.append(self.hypervolume_func.calculate(optimizer.archive))
×
657

658
    @classmethod
1✔
659
    def from_outcomes(cls, outcomes):
1✔
UNCOV
660
        ranges = [o.expected_range for o in outcomes if o.kind != o.INFO]
×
661
        minimum, maximum = np.asarray(list(zip(*ranges)))
×
UNCOV
662
        return cls(minimum, maximum)
×
663

664

665
class ArchiveLogger(AbstractConvergenceMetric):
1✔
666
    """Helper class to write the archive to disk at each iteration.
667

668
    Parameters
669
    ----------
670
    directory : str
671
    decision_varnames : list of str
672
    outcome_varnames : list of str
673
    base_filename : str, optional
674
    """
675

676
    def __init__(
1✔
677
        self,
678
        directory,
679
        decision_varnames,
680
        outcome_varnames,
681
        base_filename="archives.tar.gz",
682
    ):
683
        """Init."""
UNCOV
684
        super().__init__("archive_logger")
×
685

UNCOV
686
        self.directory = os.path.abspath(directory)
×
UNCOV
687
        self.temp = os.path.join(self.directory, "tmp")
×
UNCOV
688
        os.makedirs(self.temp, exist_ok=True)
×
689

UNCOV
690
        self.base = base_filename
×
691
        self.decision_varnames = decision_varnames
×
692
        self.outcome_varnames = outcome_varnames
×
693
        self.tarfilename = os.path.join(self.directory, base_filename)
×
694

695
        # self.index = 0
696

697
    def __call__(self, optimizer):  # noqa: D102
1✔
698
        archive = to_dataframe(
×
699
            optimizer.result, self.decision_varnames, self.outcome_varnames
700
        )
UNCOV
701
        archive.to_csv(os.path.join(self.temp, f"{optimizer.nfe}.csv"), index=False)
×
702

703
    def reset(self):  # noqa: D102
1✔
704
        # FIXME what needs to go here?
UNCOV
705
        pass
×
706

707
    def get_results(self):  # noqa: D102
1✔
UNCOV
708
        with tarfile.open(self.tarfilename, "w:gz") as z:
×
UNCOV
709
            z.add(self.temp, arcname=os.path.basename(self.temp))
×
710

UNCOV
711
        shutil.rmtree(self.temp)
×
UNCOV
712
        return None
×
713

714
    @classmethod
1✔
715
    def load_archives(cls, filename):
1✔
716
        """Load the archives stored with the ArchiveLogger.
717

718
        Parameters
719
        ----------
720
        filename : str
721
                   relative path to file
722

723
        Returns
724
        -------
725
        dict with nfe as key and dataframe as vlaue
726
        """
UNCOV
727
        archives = {}
×
UNCOV
728
        with tarfile.open(os.path.abspath(filename)) as fh:
×
UNCOV
729
            for entry in fh.getmembers():
×
UNCOV
730
                if entry.name.endswith("csv"):
×
UNCOV
731
                    key = entry.name.split("/")[1][:-4]
×
732
                    archives[int(key)] = pd.read_csv(fh.extractfile(entry))
×
733
        return archives
×
734

735

736
class OperatorProbabilities(AbstractConvergenceMetric):
1✔
737
    """OperatorProbabiliy convergence tracker for use with auto adaptive operator selection.
738

739
    Parameters
740
    ----------
741
    name : str
742
    index : int
743

744

745
    State of the art MOEAs like Borg (and GenerationalBorg provided by the workbench)
746
    use autoadaptive operator selection. The algorithm has multiple different evolutionary
747
    operators. Over the run, it tracks how well each operator is doing in producing fitter
748
    offspring. The probability of the algorithm using a given evolutionary operator is
749
    proportional to how well this operator has been doing in producing fitter offspring in
750
    recent generations. This class can be used to track these probabilities over the
751
    run of the algorithm.
752

753
    """
754

755
    def __init__(self, name, index):
1✔
UNCOV
756
        super().__init__(name)
×
UNCOV
757
        self.index = index
×
758

759
    def __call__(self, optimizer):  # noqa: D102
1✔
UNCOV
760
        try:
×
761
            props = optimizer.algorithm.variator.probabilities
×
762
            self.results.append(props[self.index])
×
UNCOV
763
        except AttributeError:
×
UNCOV
764
            pass
×
765

766

767
def epsilon_nondominated(results, epsilons, problem):
1✔
768
    """Merge the list of results into a single set of non dominated results using the provided epsilon values.
769

770
    Parameters
771
    ----------
772
    results : list of DataFrames
773
    epsilons : epsilon values for each objective
774
    problem : PlatypusProblem instance
775

776
    Returns
777
    -------
778
    DataFrame
779

780
    Notes
781
    -----
782
    this is a platypus based alternative to pareto.py (https://github.com/matthewjwoodruff/pareto.py)
783
    """
UNCOV
784
    if problem.nobjs != len(epsilons):
×
UNCOV
785
        raise ValueError(
×
786
            f"The number of epsilon values ({len(epsilons)}) must match the number of objectives {problem.nobjs}"
787
        )
788

789
    results = pd.concat(results, ignore_index=True)
×
790
    solutions = rebuild_platypus_population(results, problem)
×
UNCOV
791
    archive = EpsilonBoxArchive(epsilons)
×
UNCOV
792
    archive += solutions
×
793

794
    return to_dataframe(archive, problem.parameter_names, problem.outcome_names)
×
795

796

797
class Convergence(ProgressTrackingMixIn):
1✔
798
    """helper class for tracking convergence of optimization."""
799

800
    valid_metrics = {"hypervolume", "epsilon_progress", "archive_logger"}
1✔
801

802
    def __init__(
1✔
803
        self,
804
        metrics,
805
        max_nfe,
806
        convergence_freq=1000,
807
        logging_freq=5,
808
        log_progress=False,
809
    ):
810
        """Init."""
UNCOV
811
        super().__init__(
×
812
            max_nfe,
813
            logging_freq,
814
            _logger,
815
            log_progress=log_progress,
816
            log_func=lambda self: f"generation"
817
            f" {self.generation}, {self.i}/{self.max_nfe}",
818
        )
819

UNCOV
820
        self.max_nfe = max_nfe
×
UNCOV
821
        self.generation = -1
×
UNCOV
822
        self.index = []
×
UNCOV
823
        self.last_check = 0
×
824

825
        if metrics is None:
×
826
            metrics = []
×
827

828
        self.metrics = metrics
×
UNCOV
829
        self.convergence_freq = convergence_freq
×
830
        self.logging_freq = logging_freq
×
831

832
        # TODO what is the point of this code?
833
        for metric in metrics:
×
834
            assert isinstance(metric, AbstractConvergenceMetric)
×
835
            metric.reset()
×
836

837
    def __call__(self, optimizer, force=False):
1✔
838
        """Stores convergences information given specified convergence frequency.
839

840
        Parameters
841
        ----------
842
        optimizer : platypus optimizer instance
843
        force : boolean, optional
844
                if True, convergence information will always be stored
845
                if False, converge information will be stored if the
846
                the number of nfe since the last time of storing is equal to
847
                or higher then convergence_freq
848

849

850
        the primary use case for force is to force convergence frequency information
851
        to be stored once the stopping condition of the optimizer has been reached
852
        so that the final convergence information is kept.
853

854
        """
UNCOV
855
        nfe = optimizer.nfe
×
UNCOV
856
        super().__call__(nfe - self.i)
×
857

UNCOV
858
        self.generation += 1
×
859

860
        if (
×
861
            (nfe >= self.last_check + self.convergence_freq)
862
            or (self.last_check == 0)
863
            or force
864
        ):
865
            self.index.append(nfe)
×
UNCOV
866
            self.last_check = nfe
×
867

UNCOV
868
            for metric in self.metrics:
×
UNCOV
869
                metric(optimizer)
×
870

871
    def to_dataframe(self):  # noqa: D102
1✔
UNCOV
872
        progress = {
×
873
            metric.name: result
874
            for metric in self.metrics
875
            if (result := metric.get_results())
876
        }
877

UNCOV
878
        progress = pd.DataFrame.from_dict(progress)
×
879

UNCOV
880
        if not progress.empty:
×
UNCOV
881
            progress["nfe"] = self.index
×
882

883
        return progress
×
884

885

886
def rebuild_platypus_population(archive, problem):
1✔
887
    """Rebuild a population of platypus Solution instances.
888

889
    Parameters
890
    ----------
891
    archive : DataFrame
892
    problem : PlatypusProblem instance
893

894
    Returns
895
    -------
896
    list of platypus Solutions
897

898
    """
UNCOV
899
    expected_columns = problem.nvars + problem.nobjs
×
UNCOV
900
    actual_columns = len(archive.columns)
×
901

UNCOV
902
    if actual_columns != expected_columns:
×
UNCOV
903
        raise EMAError(
×
904
            f"The number of columns in the archive ({actual_columns}) does not match the "
905
            f"expected number of decision variables and objectives ({expected_columns})."
906
        )
907

908
    solutions = []
×
UNCOV
909
    for row in archive.itertuples():
×
UNCOV
910
        try:
×
UNCOV
911
            decision_variables = [
×
912
                getattr(row, attr) for attr in problem.parameter_names
913
            ]
914
        except AttributeError as e:
×
915
            missing_parameters = [
×
916
                attr for attr in problem.parameter_names if not hasattr(row, attr)
917
            ]
UNCOV
918
            raise EMAError(f"Parameter names {missing_parameters} not found in archive") from e
×
919

920
        try:
×
UNCOV
921
            objectives = [getattr(row, attr) for attr in problem.outcome_names]
×
UNCOV
922
        except AttributeError as e:
×
923
            missing_outcomes = [
×
924
                attr for attr in problem.outcome_names if not hasattr(row, attr)
925
            ]
926
            raise EMAError(f"Outcome names {missing_outcomes} not found in archive'") from e
×
927

928
        solution = Solution(problem)
×
UNCOV
929
        solution.variables[:] = [
×
930
            platypus_type.encode(value)
931
            for platypus_type, value in zip(problem.types, decision_variables)
932
        ]
933
        solution.objectives[:] = objectives
×
934
        solutions.append(solution)
×
UNCOV
935
    return solutions
×
936

937

938
class CombinedVariator(Variator):
1✔
939
    """Combined variator."""
940

941
    def __init__(self, crossover_prob=0.5, mutation_prob=1):
1✔
UNCOV
942
        super().__init__(2)
×
UNCOV
943
        self.SBX = platypus.SBX()
×
UNCOV
944
        self.crossover_prob = crossover_prob
×
UNCOV
945
        self.mutation_prob = mutation_prob
×
946

947
    def evolve(self, parents):
1✔
948
        child1 = copy.deepcopy(parents[0])
×
949
        child2 = copy.deepcopy(parents[1])
×
950
        problem = child1.problem
×
951

952
        # crossover
953
        # we will evolve the individual
954
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
955
            if random.random() <= self.crossover_prob:
×
UNCOV
956
                klass = kind.__class__
×
UNCOV
957
                child1, child2 = self._crossover[klass](self, child1, child2, i, kind)
×
UNCOV
958
                child1.evaluated = False
×
959
                child2.evaluated = False
×
960

961
        # mutate
962
        for child in [child1, child2]:
×
963
            self.mutate(child)
×
964

UNCOV
965
        return [child1, child2]
×
966

967
    def mutate(self, child):
1✔
968
        problem = child.problem
×
969

970
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
UNCOV
971
            if random.random() <= self.mutation_prob:
×
UNCOV
972
                klass = kind.__class__
×
973
                child = self._mutate[klass](self, child, i, kind)
×
UNCOV
974
                child.evaluated = False
×
975

976
    def crossover_real(self, child1, child2, i, type):  # @ReservedAssignment
1✔
977
        # sbx
978
        x1 = float(child1.variables[i])
×
979
        x2 = float(child2.variables[i])
×
UNCOV
980
        lb = type.min_value
×
UNCOV
981
        ub = type.max_value
×
982

983
        x1, x2 = self.SBX.sbx_crossover(x1, x2, lb, ub)
×
984

985
        child1.variables[i] = x1
×
986
        child2.variables[i] = x2
×
987

988
        return child1, child2
×
989

990
    def crossover_integer(self, child1, child2, i, type):  # @ReservedAssignment
1✔
991
        # HUX()
UNCOV
992
        for j in range(type.nbits):
×
993
            if child1.variables[i][j] != child2.variables[i][j]: # noqa: SIM102
×
UNCOV
994
                if bool(random.getrandbits(1)):
×
UNCOV
995
                    child1.variables[i][j] = not child1.variables[i][j]
×
UNCOV
996
                    child2.variables[i][j] = not child2.variables[i][j]
×
997
        return child1, child2
×
998

999
    def crossover_categorical(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1000
        # SSX()
1001
        # can probably be implemented in a simple manner, since size
1002
        # of subset is fixed to 1
1003

UNCOV
1004
        s1 = set(child1.variables[i])
×
UNCOV
1005
        s2 = set(child2.variables[i])
×
1006

UNCOV
1007
        for j in range(type.size):
×
UNCOV
1008
            if (
×
1009
                (child2.variables[i][j] not in s1)
1010
                and (child1.variables[i][j] not in s2)
1011
                and (random.random() < 0.5)
1012
            ):
1013
                temp = child1.variables[i][j]
×
UNCOV
1014
                child1.variables[i][j] = child2.variables[i][j]
×
UNCOV
1015
                child2.variables[i][j] = temp
×
1016

UNCOV
1017
        return child1, child2
×
1018

1019
    def mutate_real(self, child, i, type, distribution_index=20):  # @ReservedAssignment
1✔
1020
        # PM
UNCOV
1021
        x = child.variables[i]
×
1022
        lower = type.min_value
×
UNCOV
1023
        upper = type.max_value
×
1024

UNCOV
1025
        u = random.random()
×
1026
        dx = upper - lower
×
1027

1028
        if u < 0.5:
×
UNCOV
1029
            bl = (x - lower) / dx
×
1030
            b = 2.0 * u + (1.0 - 2.0 * u) * pow(1.0 - bl, distribution_index + 1.0)
×
1031
            delta = pow(b, 1.0 / (distribution_index + 1.0)) - 1.0
×
1032
        else:
1033
            bu = (upper - x) / dx
×
1034
            b = 2.0 * (1.0 - u) + 2.0 * (u - 0.5) * pow(
×
1035
                1.0 - bu, distribution_index + 1.0
1036
            )
UNCOV
1037
            delta = 1.0 - pow(b, 1.0 / (distribution_index + 1.0))
×
1038

1039
        x = x + delta * dx
×
UNCOV
1040
        x = max(lower, min(x, upper))
×
1041

1042
        child.variables[i] = x
×
UNCOV
1043
        return child
×
1044

1045
    def mutate_integer(self, child, i, type, probability=1):  # @ReservedAssignment
1✔
1046
        # bitflip
1047
        for j in range(type.nbits):
×
1048
            if random.random() <= probability:
×
UNCOV
1049
                child.variables[i][j] = not child.variables[i][j]
×
UNCOV
1050
        return child
×
1051

1052
    def mutate_categorical(self, child, i, type):  # @ReservedAssignment
1✔
1053
        # replace
1054
        probability = 1 / type.size
×
1055

UNCOV
1056
        if random.random() <= probability:
×
UNCOV
1057
            subset = child.variables[i]
×
1058

1059
            if len(subset) < len(type.elements):
×
UNCOV
1060
                j = random.randrange(len(subset))
×
1061

1062
                nonmembers = list(set(type.elements) - set(subset))
×
UNCOV
1063
                k = random.randrange(len(nonmembers))
×
1064
                subset[j] = nonmembers[k]
×
1065

UNCOV
1066
            len(subset)
×
1067

1068
            child.variables[i] = subset
×
1069

UNCOV
1070
        return child
×
1071

1072
    _crossover = {
1✔
1073
        Real: crossover_real,
1074
        Integer: crossover_integer,
1075
        Subset: crossover_categorical,
1076
    }
1077

1078
    _mutate = {
1✔
1079
        Real: mutate_real,
1080
        Integer: mutate_integer,
1081
        Subset: mutate_categorical,
1082
    }
1083

1084

1085
def _optimize(
1✔
1086
    problem,
1087
    evaluator,
1088
    algorithm,
1089
    convergence,
1090
    nfe,
1091
    convergence_freq,
1092
    logging_freq,
1093
    variator=None,
1094
    **kwargs,
1095
):
1096
    """Helper function for optimization."""
1097
    klass = problem.types[0].__class__
×
1098

UNCOV
1099
    try:
×
UNCOV
1100
        eps_values = kwargs["epsilons"]
×
UNCOV
1101
    except KeyError:
×
1102
        pass
×
1103
    else:
1104
        if len(eps_values) != len(problem.outcome_names):
×
1105
            raise EMAError("Number of epsilon values does not match number of outcomes")
×
1106

1107
    if variator is None:
×
UNCOV
1108
        if all(isinstance(t, klass) for t in problem.types):
×
1109
            variator = None
×
1110
        else:
UNCOV
1111
            variator = CombinedVariator()
×
1112
    # mutator = CombinedMutator()
1113

1114
    optimizer = algorithm(
×
1115
        problem, evaluator=evaluator, variator=variator, log_frequency=500, **kwargs
1116
    )
1117
    # optimizer.mutator = mutator
1118

1119
    convergence = Convergence(
×
1120
        convergence, nfe, convergence_freq=convergence_freq, logging_freq=logging_freq
1121
    )
UNCOV
1122
    callback = functools.partial(convergence, optimizer)
×
UNCOV
1123
    evaluator.callback = callback
×
1124

UNCOV
1125
    with temporary_filter(name=[callbacks.__name__, evaluators.__name__], level=INFO):
×
UNCOV
1126
        optimizer.run(nfe)
×
1127

1128
    convergence(optimizer, force=True)
×
1129

1130
    # convergence.pbar.__exit__(None, None, None)
1131

UNCOV
1132
    results = to_dataframe(
×
1133
        optimizer.result, problem.parameter_names, problem.outcome_names
1134
    )
UNCOV
1135
    convergence = convergence.to_dataframe()
×
1136

1137
    message = "optimization completed, found {} solutions"
×
UNCOV
1138
    _logger.info(message.format(len(optimizer.archive)))
×
1139

1140
    if convergence.empty:
×
UNCOV
1141
        return results
×
1142
    else:
1143
        return results, convergence
×
1144

1145

1146
class BORGDefaultDescriptor:
1✔
1147
    """Descriptor used by Borg."""
1148

1149
    # this treats defaults as class level attributes!
1150

1151
    def __init__(self, default_function):
1✔
1152
        self.default_function = default_function
1✔
1153

1154
    def __get__(self, instance, owner):
1✔
1155
        return self.default_function(instance.problem.nvars)
1✔
1156

1157
    def __set_name__(self, owner, name):
1✔
1158
        self.name = name
1✔
1159

1160

1161
class GenerationalBorg(EpsilonProgressContinuation):
1✔
1162
    """A generational implementation of the BORG Framework.
1163

1164
    This algorithm adopts Epsilon Progress Continuation, and Auto Adaptive
1165
    Operator Selection, but embeds them within the NSGAII generational
1166
    algorithm, rather than the steady state implementation used by the BORG
1167
    algorithm.
1168

1169
    The parametrization of all operators is based on the default values as used
1170
    in Borg 1.9.
1171

1172
    Note:: limited to RealParameters only.
1173

1174
    """
1175

1176
    pm_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1177
    pm_dist = 20
1✔
1178

1179
    sbx_prop = 1
1✔
1180
    sbx_dist = 15
1✔
1181

1182
    de_rate = 0.1
1✔
1183
    de_stepsize = 0.5
1✔
1184

1185
    um_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1186

1187
    spx_nparents = 10
1✔
1188
    spx_noffspring = 2
1✔
1189
    spx_expansion = 0.3
1✔
1190

1191
    pcx_nparents = 10
1✔
1192
    pcx_noffspring = 2
1✔
1193
    pcx_eta = 0.1
1✔
1194
    pcx_zeta = 0.1
1✔
1195

1196
    undx_nparents = 10
1✔
1197
    undx_noffspring = 2
1✔
1198
    undx_zeta = 0.5
1✔
1199
    undx_eta = 0.35
1✔
1200

1201
    def __init__(
1✔
1202
        self,
1203
        problem,
1204
        epsilons,
1205
        population_size=100,
1206
        generator=RandomGenerator(), # noqa: B008
1207
        selector=TournamentSelector(2), # noqa: B008
1208
        variator=None,
1209
        **kwargs,
1210
    ):
1211
        """Init."""
UNCOV
1212
        self.problem = problem
×
1213

1214
        # Parameterization taken from
1215
        # Borg: An Auto-Adaptive MOEA Framework - Hadka, Reed
UNCOV
1216
        variators = [
×
1217
            GAOperator(
1218
                SBX(probability=self.sbx_prop, distribution_index=self.sbx_dist),
1219
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1220
            ),
1221
            GAOperator(
1222
                PCX(
1223
                    nparents=self.pcx_nparents,
1224
                    noffspring=self.pcx_noffspring,
1225
                    eta=self.pcx_eta,
1226
                    zeta=self.pcx_zeta,
1227
                ),
1228
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1229
            ),
1230
            GAOperator(
1231
                DifferentialEvolution(
1232
                    crossover_rate=self.de_rate, step_size=self.de_stepsize
1233
                ),
1234
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1235
            ),
1236
            GAOperator(
1237
                UNDX(
1238
                    nparents=self.undx_nparents,
1239
                    noffspring=self.undx_noffspring,
1240
                    zeta=self.undx_zeta,
1241
                    eta=self.undx_eta,
1242
                ),
1243
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1244
            ),
1245
            GAOperator(
1246
                SPX(
1247
                    nparents=self.spx_nparents,
1248
                    noffspring=self.spx_noffspring,
1249
                    expansion=self.spx_expansion,
1250
                ),
1251
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1252
            ),
1253
            UM(probability=self.um_p),
1254
        ]
1255

UNCOV
1256
        variator = Multimethod(self, variators)
×
1257

UNCOV
1258
        super().__init__(
×
1259
            NSGAII(
1260
                problem,
1261
                population_size,
1262
                generator,
1263
                selector,
1264
                variator,
1265
                EpsilonBoxArchive(epsilons),
1266
                **kwargs,
1267
            )
1268
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc