• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 17440226021

03 Sep 2025 04:46PM UTC coverage: 83.291% (+3.0%) from 80.3%
17440226021

push

github

web-flow
Update ci.yml (#406)

7238 of 8690 relevant lines covered (83.29%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.8
/ema_workbench/em_framework/optimization.py
1
"""Wrapper around platypus-opt."""
2

3
import copy
1✔
4
import functools
1✔
5
import os
1✔
6
import random
1✔
7
import shutil
1✔
8
import tarfile
1✔
9
import warnings
1✔
10

11
import numpy as np
1✔
12
import pandas as pd
1✔
13

14
from ..util import INFO, EMAError, get_module_logger, temporary_filter
1✔
15
from . import callbacks, evaluators
1✔
16
from .outcomes import AbstractOutcome
1✔
17
from .parameters import (
1✔
18
    BooleanParameter,
19
    CategoricalParameter,
20
    IntegerParameter,
21
    RealParameter,
22
)
23
from .points import Policy, Scenario
1✔
24
from .samplers import determine_parameters
1✔
25
from .util import ProgressTrackingMixIn, determine_objects
1✔
26

27
try:
1✔
28
    import platypus
1✔
29
    from platypus import (
1✔
30
        NSGAII,
31
        PCX,
32
        PM,
33
        SBX,
34
        SPX,
35
        UM,
36
        UNDX,
37
        DifferentialEvolution,
38
        EpsilonBoxArchive,
39
        EpsilonIndicator,
40
        EpsilonProgressContinuation,
41
        EpsNSGAII,
42
        GAOperator,
43
        GenerationalDistance,
44
        Hypervolume,
45
        Integer,
46
        InvertedGenerationalDistance,
47
        Multimethod,
48
        RandomGenerator,
49
        Real,
50
        Solution,
51
        Spacing,
52
        Subset,
53
        TournamentSelector,
54
        Variator,
55
    )  # @UnresolvedImport
56
    from platypus import Problem as PlatypusProblem
1✔
57

58

59
except ImportError:
×
60
    warnings.warn(
×
61
        "Platypus based optimization not available. Install with `pip install platypus-opt`",
62
        ImportWarning, stacklevel=2
63
    )
64

65
    class PlatypusProblem:
×
66
        constraints = []
×
67

68
        def __init__(self, *args, **kwargs):
×
69
            pass
×
70

71
    class Variator:
×
72
        def __init__(self, *args, **kwargs):
×
73
            pass
×
74

75
    class RandomGenerator:
×
76
        def __call__(self, *args, **kwargs):
×
77
            pass
×
78

79
    class TournamentSelector:
×
80
        def __init__(self, *args, **kwargs):
×
81
            pass
×
82

83
        def __call__(self, *args, **kwargs):
×
84
            pass
×
85

86
    class EpsilonProgressContinuation:
×
87
        pass
×
88

89
    EpsNSGAII = None
×
90
    platypus = None
×
91
    Real = Integer = Subset = None
×
92

93
# Created on 5 Jun 2017
94
#
95
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
96

97
__all__ = [
1✔
98
    "ArchiveLogger",
99
    "Convergence",
100
    "EpsilonIndicatorMetric",
101
    "EpsilonProgress",
102
    "GenerationalDistanceMetric",
103
    "HypervolumeMetric",
104
    "InvertedGenerationalDistanceMetric",
105
    "OperatorProbabilities",
106
    "Problem",
107
    "RobustProblem",
108
    "SpacingMetric",
109
    "epsilon_nondominated",
110
    "rebuild_platypus_population",
111
    "to_problem",
112
    "to_robust_problem",
113
]
114
_logger = get_module_logger(__name__)
1✔
115

116

117
class Problem(PlatypusProblem):
1✔
118
    """Small extension to Platypus problem object.
119

120
    Includes information on the names of the decision variables, the names of the outcomes,
121
    and the type of search.
122
    """
123

124
    @property
1✔
125
    def parameter_names(self):
1✔
126
        """Getter for parameter names."""
127
        return [e.name for e in self.parameters]
×
128

129
    def __init__(
1✔
130
        self, searchover, parameters, outcome_names, constraints, reference=None
131
    ):
132
        """Init."""
133
        if constraints is None:
1✔
134
            constraints = []
1✔
135

136
        super().__init__(len(parameters), len(outcome_names), nconstrs=len(constraints))
1✔
137
        #         assert len(parameters) == len(parameter_names)
138
        assert searchover in ("levers", "uncertainties", "robust")
1✔
139

140
        if searchover == "levers":
1✔
141
            assert not reference or isinstance(reference, Scenario)
1✔
142
        elif searchover == "uncertainties":
1✔
143
            assert not reference or isinstance(reference, Policy)
1✔
144
        else:
145
            assert not reference
1✔
146

147
        self.searchover = searchover
1✔
148
        self.parameters = parameters
1✔
149
        self.outcome_names = outcome_names
1✔
150
        self.ema_constraints = constraints
1✔
151
        self.constraint_names = [c.name for c in constraints]
1✔
152
        self.reference = reference if reference else 0
1✔
153

154

155
class RobustProblem(Problem):
1✔
156
    """Small extension to Problem object for robust optimization.
157

158
    adds the scenarios and the robustness functions
159
    """
160

161
    def __init__(
1✔
162
        self, parameters, outcome_names, scenarios, robustness_functions, constraints
163
    ):
164
        """Init."""
165
        super().__init__("robust", parameters, outcome_names, constraints)
1✔
166
        assert len(robustness_functions) == len(outcome_names)
1✔
167
        self.scenarios = scenarios
1✔
168
        self.robustness_functions = robustness_functions
1✔
169

170

171
def to_problem(model, searchover, reference=None, constraints=None):
1✔
172
    """Helper function to create Problem object.
173

174
    Parameters
175
    ----------
176
    model : AbstractModel instance
177
    searchover : str
178
    reference : Policy or Scenario instance, optional
179
                overwrite the default scenario in case of searching over
180
                levers, or default policy in case of searching over
181
                uncertainties
182
    constraints : list, optional
183

184
    Returns:
185
    -------
186
    Problem instance
187

188
    """
189
    # extract the levers and the outcomes
190
    decision_variables = determine_parameters(model, searchover, union=True)
1✔
191

192
    outcomes = determine_objects(model, "outcomes")
1✔
193
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
194
    outcome_names = [outcome.name for outcome in outcomes]
1✔
195

196
    if not outcomes:
1✔
197
        raise EMAError(
×
198
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
199
        )
200

201
    problem = Problem(
1✔
202
        searchover, decision_variables, outcome_names, constraints, reference=reference
203
    )
204
    problem.types[:] = to_platypus_types(decision_variables)
1✔
205
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
206
    problem.constraints[:] = "==0"
1✔
207

208
    return problem
1✔
209

210

211
def to_robust_problem(model, scenarios, robustness_functions, constraints=None):
1✔
212
    """Helper function to create RobustProblem object.
213

214
    Parameters
215
    ----------
216
    model : AbstractModel instance
217
    scenarios : collection
218
    robustness_functions : iterable of ScalarOutcomes
219
    constraints : list, optional
220

221
    Returns:
222
    -------
223
    RobustProblem instance
224

225
    """
226
    # extract the levers and the outcomes
227
    decision_variables = determine_parameters(model, "levers", union=True)
1✔
228

229
    outcomes = robustness_functions
1✔
230
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
231
    outcome_names = [outcome.name for outcome in outcomes]
1✔
232

233
    if not outcomes:
1✔
234
        raise EMAError(
×
235
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
236
        )
237

238
    problem = RobustProblem(
1✔
239
        decision_variables, outcome_names, scenarios, robustness_functions, constraints
240
    )
241

242
    problem.types[:] = to_platypus_types(decision_variables)
1✔
243
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
244
    problem.constraints[:] = "==0"
1✔
245

246
    return problem
1✔
247

248

249
def to_platypus_types(decision_variables):
1✔
250
    """Helper function for mapping from workbench parameter types to platypus parameter types."""
251
    # TODO:: should categorical not be platypus.Subset, with size == 1?
252
    _type_mapping = {
1✔
253
        RealParameter: platypus.Real,
254
        IntegerParameter: platypus.Integer,
255
        CategoricalParameter: platypus.Subset,
256
        BooleanParameter: platypus.Subset,
257
    }
258

259
    types = []
1✔
260
    for dv in decision_variables:
1✔
261
        klass = _type_mapping[type(dv)]
1✔
262

263
        if not isinstance(dv, (CategoricalParameter | BooleanParameter)):
1✔
264
            decision_variable = klass(dv.lower_bound, dv.upper_bound)
1✔
265
        else:
266
            decision_variable = klass(dv.categories, 1)
1✔
267

268
        types.append(decision_variable)
1✔
269
    return types
1✔
270

271

272
def to_dataframe(solutions, dvnames, outcome_names):
1✔
273
    """Helper function to turn a collection of platypus Solution instances into a pandas DataFrame.
274

275
    Parameters
276
    ----------
277
    solutions : collection of Solution instances
278
    dvnames : list of str
279
    outcome_names : list of str
280

281
    Returns:
282
    -------
283
    pandas DataFrame
284
    """
285
    results = []
1✔
286
    for solution in platypus.unique(solutions):
1✔
287
        vars = transform_variables(
1✔
288
            solution.problem, solution.variables
289
        )  # @ReservedAssignment
290

291
        decision_vars = dict(zip(dvnames, vars))
1✔
292
        decision_out = dict(zip(outcome_names, solution.objectives))
1✔
293

294
        result = decision_vars.copy()
1✔
295
        result.update(decision_out)
1✔
296

297
        results.append(result)
1✔
298

299
    results = pd.DataFrame(results, columns=dvnames + outcome_names)
1✔
300
    return results
1✔
301

302

303
def process_uncertainties(jobs):
1✔
304
    """Helper function to map jobs generated by platypus to Scenario objects.
305

306
    Parameters
307
    ----------
308
    jobs : collection
309

310
    Returns:
311
    -------
312
    scenarios, policies
313

314
    """
315
    problem = jobs[0].solution.problem
×
316
    scenarios = []
×
317

318
    jobs = _process(jobs, problem)
×
319
    for i, job in enumerate(jobs):
×
320
        name = str(i)
×
321
        scenario = Scenario(name=name, **job)
×
322
        scenarios.append(scenario)
×
323

324
    policies = problem.reference
×
325

326
    return scenarios, policies
×
327

328

329
def process_levers(jobs):
1✔
330
    """Helper function to map jobs generated by platypus to Policy objects.
331

332
    Parameters
333
    ----------
334
    jobs : collection
335

336
    Returns:
337
    -------
338
    scenarios, policies
339

340
    """
341
    problem = jobs[0].solution.problem
×
342
    policies = []
×
343
    jobs = _process(jobs, problem)
×
344
    for i, job in enumerate(jobs):
×
345
        name = str(i)
×
346
        policy = Policy(name=name, **job)
×
347
        policies.append(policy)
×
348

349
    scenarios = problem.reference
×
350

351
    return scenarios, policies
×
352

353

354
def _process(jobs, problem):
1✔
355
    """Helper function to transform platypus job to dict with correct values for workbench."""
356
    processed_jobs = []
×
357
    for job in jobs:
×
358
        variables = transform_variables(problem, job.solution.variables)
×
359
        processed_job = {}
×
360
        for param, var in zip(problem.parameters, variables):
×
361
            try:
×
362
                var = var.value # noqa: PLW2901
×
363
            except AttributeError:
×
364
                pass
×
365
            processed_job[param.name] = var
×
366
        processed_jobs.append(processed_job)
×
367
    return processed_jobs
×
368

369

370
def process_robust(jobs):
1✔
371
    """Helper function to process robust optimization jobs.
372

373
    Parameters
374
    ----------
375
    jobs : collection
376

377
    Returns:
378
    -------
379
    scenarios, policies
380

381
    """
382
    _, policies = process_levers(jobs)
×
383
    scenarios = jobs[0].solution.problem.scenarios
×
384

385
    return scenarios, policies
×
386

387

388
def transform_variables(problem, variables):
1✔
389
    """Helper function for transforming platypus variables."""
390
    converted_vars = []
1✔
391
    for type, var in zip(problem.types, variables):  # @ReservedAssignment
1✔
392
        var = type.decode(var) # noqa: PLW2901
1✔
393
        try:
1✔
394
            var = var[0] # noqa: PLW2901
1✔
395
        except TypeError:
1✔
396
            pass
1✔
397

398
        converted_vars.append(var)
1✔
399
    return converted_vars
1✔
400

401

402
def evaluate(jobs_collection, experiments, outcomes, problem):
1✔
403
    """Helper function for mapping the results from perform_experiments back to what platypus needs."""
404
    searchover = problem.searchover
×
405
    outcome_names = problem.outcome_names
×
406
    constraints = problem.ema_constraints
×
407

408
    column = "policy" if searchover == "levers" else "scenario"
×
409

410
    for entry, job in jobs_collection:
×
411
        logical = experiments[column] == entry.name
×
412

413
        job_outputs = {}
×
414
        for k, v in outcomes.items():
×
415
            job_outputs[k] = v[logical][0]
×
416

417
        # TODO:: only retain uncertainties
418
        job_experiment = experiments[logical]
×
419
        job_constraints = _evaluate_constraints(
×
420
            job_experiment, job_outputs, constraints
421
        )
422
        job_outcomes = [job_outputs[key] for key in outcome_names]
×
423

424
        if job_constraints:
×
425
            job.solution.problem.function = lambda _: (job_outcomes, job_constraints) # noqa: B023
×
426
        else:
427
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
428
        job.solution.evaluate()
×
429

430

431
def evaluate_robust(jobs_collection, experiments, outcomes, problem):
1✔
432
    """Helper function for mapping the results from perform_experiments back to what Platypus needs."""
433
    robustness_functions = problem.robustness_functions
×
434
    constraints = problem.ema_constraints
×
435

436
    for entry, job in jobs_collection:
×
437
        logical = experiments["policy"] == entry.name
×
438

439
        job_outcomes_dict = {}
×
440
        job_outcomes = []
×
441
        for rf in robustness_functions:
×
442
            data = [outcomes[var_name][logical] for var_name in rf.variable_name]
×
443
            score = rf.function(*data)
×
444
            job_outcomes_dict[rf.name] = score
×
445
            job_outcomes.append(score)
×
446

447
        # TODO:: only retain levers
448
        job_experiment = experiments[logical].iloc[0]
×
449
        job_constraints = _evaluate_constraints(
×
450
            job_experiment, job_outcomes_dict, constraints
451
        )
452

453
        if job_constraints:
×
454
            job.solution.problem.function = lambda _: (job_outcomes, job_constraints)  # noqa: B023
×
455
        else:
456
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
457

458
        job.solution.evaluate()
×
459

460

461
def _evaluate_constraints(job_experiment, job_outcomes, constraints):
1✔
462
    """Helper function for evaluating the constraints for a given job."""
463
    job_constraints = []
×
464
    for constraint in constraints:
×
465
        data = [job_experiment[var] for var in constraint.parameter_names]
×
466
        data += [job_outcomes[var] for var in constraint.outcome_names]
×
467
        constraint_value = constraint.process(data)
×
468
        job_constraints.append(constraint_value)
×
469
    return job_constraints
×
470

471

472
class AbstractConvergenceMetric:
1✔
473
    """Base convergence metric class."""
474

475
    def __init__(self, name):
1✔
476
        """Init."""
477
        super().__init__()
×
478
        self.name = name
×
479
        self.results = []
×
480

481
    def __call__(self, optimizer):
1✔
482
        raise NotImplementedError
×
483

484
    def reset(self):
1✔
485
        self.results = []
×
486

487
    def get_results(self):
1✔
488
        return self.results
×
489

490

491
class EpsilonProgress(AbstractConvergenceMetric):
1✔
492
    """Epsilon progress convergence metric class."""
493

494
    def __init__(self):
1✔
495
        """Init."""
496
        super().__init__("epsilon_progress")
×
497

498
    def __call__(self, optimizer): # noqa: D102
1✔
499
        self.results.append(optimizer.archive.improvements)
×
500

501

502
class MetricWrapper:
1✔
503
    """Wrapper class for wrapping platypus indicators.
504

505
    Parameters
506
    ----------
507
    reference_set : DataFrame
508
    problem : PlatypusProblem instance
509
    kwargs : dict
510
             any additional keyword arguments to be passed
511
             on to the wrapper platypus indicator class
512

513
    Notes:
514
    -----
515
    this class relies on multi-inheritance and careful consideration
516
    of the MRO to conveniently wrap the convergence metrics provided
517
    by platypus.
518

519
    """
520

521
    def __init__(self, reference_set, problem, **kwargs):
1✔
522
        self.problem = problem
×
523
        reference_set = rebuild_platypus_population(reference_set, self.problem)
×
524
        super().__init__(reference_set=reference_set, **kwargs)
×
525

526
    def calculate(self, archive):
1✔
527
        solutions = rebuild_platypus_population(archive, self.problem)
×
528
        return super().calculate(solutions)
×
529

530

531
class HypervolumeMetric(MetricWrapper, Hypervolume):
1✔
532
    """Hypervolume metric.
533

534
    Parameters
535
    ----------
536
    reference_set : DataFrame
537
    problem : PlatypusProblem instance
538

539
    this is a thin wrapper around Hypervolume as provided
540
    by platypus to make it easier to use in conjunction with the
541
    workbench.
542

543
    """
544

545

546
class GenerationalDistanceMetric(MetricWrapper, GenerationalDistance):
1✔
547
    """GenerationalDistance metric.
548

549
    Parameters
550
    ----------
551
    reference_set : DataFrame
552
    problem : PlatypusProblem instance
553
    d : int, default=1
554
        the power in the intergenerational distance function
555

556

557
    This is a thin wrapper around GenerationalDistance as provided
558
    by platypus to make it easier to use in conjunction with the
559
    workbench.
560

561
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
562
    for more information
563

564
    """
565

566

567
class InvertedGenerationalDistanceMetric(MetricWrapper, InvertedGenerationalDistance):
1✔
568
    """InvertedGenerationalDistance metric.
569

570
    Parameters
571
    ----------
572
    reference_set : DataFrame
573
    problem : PlatypusProblem instance
574
    d : int, default=1
575
        the power in the inverted intergenerational distance function
576

577

578
    This is a thin wrapper around InvertedGenerationalDistance as provided
579
    by platypus to make it easier to use in conjunction with the
580
    workbench.
581

582
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
583
    for more information
584

585
    """
586

587

588
class EpsilonIndicatorMetric(MetricWrapper, EpsilonIndicator):
1✔
589
    """EpsilonIndicator metric.
590

591
    Parameters
592
    ----------
593
    reference_set : DataFrame
594
    problem : PlatypusProblem instance
595

596

597
    this is a thin wrapper around EpsilonIndicator as provided
598
    by platypus to make it easier to use in conjunction with the
599
    workbench.
600

601
    """
602

603

604
class SpacingMetric(MetricWrapper, Spacing):
1✔
605
    """Spacing metric.
606

607
    Parameters
608
    ----------
609
    problem : PlatypusProblem instance
610

611

612
    this is a thin wrapper around Spacing as provided
613
    by platypus to make it easier to use in conjunction with the
614
    workbench.
615

616
    """
617

618
    def __init__(self, problem):  # noqa: D107
1✔
619
        self.problem = problem
×
620

621

622
class HyperVolume(AbstractConvergenceMetric):
1✔
623
    """Hypervolume convergence metric class.
624

625
    This metric is derived from a hyper-volume measure, which describes the
626
    multi-dimensional volume of space contained within the pareto front. When
627
    computed with minimum and maximums, it describes the ratio of dominated
628
    outcomes to all possible outcomes in the extent of the space.  Getting this
629
    number to be high or low is not necessarily important, as not all outcomes
630
    within the min-max range will be feasible.  But, having the hypervolume remain
631
    fairly stable over multiple generations of the evolutionary algorithm provides
632
    an indicator of convergence.
633

634
    Parameters
635
    ---------
636
    minimum : numpy array
637
    maximum : numpy array
638

639

640
    This class is deprecated and will be removed in version 3.0 of the EMAworkbench.
641
    Use ArchiveLogger instead and calculate hypervolume in post using HypervolumeMetric
642
    as also shown in the directed search tutorial.
643

644
    """
645

646
    def __init__(self, minimum, maximum):
1✔
647
        super().__init__("hypervolume")
×
648
        warnings.warn(
×
649
            "HyperVolume is deprecated and will be removed in version 3.0 of the EMAworkbench."
650
            "Use ArchiveLogger and HypervolumeMetric instead",
651
            DeprecationWarning, stacklevel=2
652
        )
653
        self.hypervolume_func = Hypervolume(minimum=minimum, maximum=maximum)
×
654

655
    def __call__(self, optimizer):
1✔
656
        self.results.append(self.hypervolume_func.calculate(optimizer.archive))
×
657

658
    @classmethod
1✔
659
    def from_outcomes(cls, outcomes):
1✔
660
        ranges = [o.expected_range for o in outcomes if o.kind != o.INFO]
×
661
        minimum, maximum = np.asarray(list(zip(*ranges)))
×
662
        return cls(minimum, maximum)
×
663

664

665
class ArchiveLogger(AbstractConvergenceMetric):
1✔
666
    """Helper class to write the archive to disk at each iteration.
667

668
    Parameters
669
    ----------
670
    directory : str
671
    decision_varnames : list of str
672
    outcome_varnames : list of str
673
    base_filename : str, optional
674
    """
675

676
    def __init__(
1✔
677
        self,
678
        directory,
679
        decision_varnames,
680
        outcome_varnames,
681
        base_filename="archives.tar.gz",
682
    ):
683
        """Init."""
684
        super().__init__("archive_logger")
×
685

686
        self.directory = os.path.abspath(directory)
×
687
        self.temp = os.path.join(self.directory, "tmp")
×
688
        os.makedirs(self.temp, exist_ok=True)
×
689

690
        self.base = base_filename
×
691
        self.decision_varnames = decision_varnames
×
692
        self.outcome_varnames = outcome_varnames
×
693
        self.tarfilename = os.path.join(self.directory, base_filename)
×
694

695
        # self.index = 0
696

697
    def __call__(self, optimizer):  # noqa: D102
1✔
698
        archive = to_dataframe(
×
699
            optimizer.result, self.decision_varnames, self.outcome_varnames
700
        )
701
        archive.to_csv(os.path.join(self.temp, f"{optimizer.nfe}.csv"), index=False)
×
702

703
    def reset(self):  # noqa: D102
1✔
704
        # FIXME what needs to go here?
705
        pass
×
706

707
    def get_results(self):  # noqa: D102
1✔
708
        with tarfile.open(self.tarfilename, "w:gz") as z:
×
709
            z.add(self.temp, arcname=os.path.basename(self.temp))
×
710

711
        shutil.rmtree(self.temp)
×
712
        return None
×
713

714
    @classmethod
1✔
715
    def load_archives(cls, filename):
1✔
716
        """Load the archives stored with the ArchiveLogger.
717

718
        Parameters
719
        ----------
720
        filename : str
721
                   relative path to file
722

723
        Returns:
724
        -------
725
        dict with nfe as key and dataframe as vlaue
726
        """
727
        archives = {}
×
728
        with tarfile.open(os.path.abspath(filename)) as fh:
×
729
            for entry in fh.getmembers():
×
730
                if entry.name.endswith("csv"):
×
731
                    key = entry.name.split("/")[1][:-4]
×
732
                    archives[int(key)] = pd.read_csv(fh.extractfile(entry))
×
733
        return archives
×
734

735

736
class OperatorProbabilities(AbstractConvergenceMetric):
1✔
737
    """OperatorProbabiliy convergence tracker for use with auto adaptive operator selection.
738

739
    Parameters
740
    ----------
741
    name : str
742
    index : int
743

744

745
    State of the art MOEAs like Borg (and GenerationalBorg provided by the workbench)
746
    use autoadaptive operator selection. The algorithm has multiple different evolutionary
747
    operators. Over the run, it tracks how well each operator is doing in producing fitter
748
    offspring. The probability of the algorithm using a given evolutionary operator is
749
    proportional to how well this operator has been doing in producing fitter offspring in
750
    recent generations. This class can be used to track these probabilities over the
751
    run of the algorithm.
752

753
    """
754

755
    def __init__(self, name, index):  # noqa: D107
1✔
756
        super().__init__(name)
×
757
        self.index = index
×
758

759
    def __call__(self, optimizer):  # noqa: D102
1✔
760
        try:
×
761
            props = optimizer.algorithm.variator.probabilities
×
762
            self.results.append(props[self.index])
×
763
        except AttributeError:
×
764
            pass
×
765

766

767
def epsilon_nondominated(results, epsilons, problem):
1✔
768
    """Merge the list of results into a single set of non dominated results using the provided epsilon values.
769

770
    Parameters
771
    ----------
772
    results : list of DataFrames
773
    epsilons : epsilon values for each objective
774
    problem : PlatypusProblem instance
775

776
    Returns:
777
    -------
778
    DataFrame
779

780
    Notes:
781
    -----
782
    this is a platypus based alternative to pareto.py (https://github.com/matthewjwoodruff/pareto.py)
783
    """
784
    if problem.nobjs != len(epsilons):
×
785
        raise ValueError(
×
786
            f"The number of epsilon values ({len(epsilons)}) must match the number of objectives {problem.nobjs}"
787
        )
788

789
    results = pd.concat(results, ignore_index=True)
×
790
    solutions = rebuild_platypus_population(results, problem)
×
791
    archive = EpsilonBoxArchive(epsilons)
×
792
    archive += solutions
×
793

794
    return to_dataframe(archive, problem.parameter_names, problem.outcome_names)
×
795

796

797
class Convergence(ProgressTrackingMixIn):
1✔
798
    """helper class for tracking convergence of optimization."""
799

800
    valid_metrics = {"hypervolume", "epsilon_progress", "archive_logger"}
1✔
801

802
    def __init__(
1✔
803
        self,
804
        metrics,
805
        max_nfe,
806
        convergence_freq=1000,
807
        logging_freq=5,
808
        log_progress=False,
809
    ):
810
        """Init."""
811
        super().__init__(
×
812
            max_nfe,
813
            logging_freq,
814
            _logger,
815
            log_progress=log_progress,
816
            log_func=lambda self: f"generation"
817
            f" {self.generation}, {self.i}/{self.max_nfe}",
818
        )
819

820
        self.max_nfe = max_nfe
×
821
        self.generation = -1
×
822
        self.index = []
×
823
        self.last_check = 0
×
824

825
        if metrics is None:
×
826
            metrics = []
×
827

828
        self.metrics = metrics
×
829
        self.convergence_freq = convergence_freq
×
830
        self.logging_freq = logging_freq
×
831

832
        # TODO what is the point of this code?
833
        for metric in metrics:
×
834
            assert isinstance(metric, AbstractConvergenceMetric)
×
835
            metric.reset()
×
836

837
    def __call__(self, optimizer, force=False):
1✔
838
        """Stores convergences information given specified convergence frequency.
839

840
        Parameters
841
        ----------
842
        optimizer : platypus optimizer instance
843
        force : boolean, optional
844
                if True, convergence information will always be stored
845
                if False, converge information will be stored if the
846
                the number of nfe since the last time of storing is equal to
847
                or higher then convergence_freq
848

849

850
        the primary use case for force is to force convergence frequency information
851
        to be stored once the stopping condition of the optimizer has been reached
852
        so that the final convergence information is kept.
853

854
        """
855
        nfe = optimizer.nfe
×
856
        super().__call__(nfe - self.i)
×
857

858
        self.generation += 1
×
859

860
        if (
×
861
            (nfe >= self.last_check + self.convergence_freq)
862
            or (self.last_check == 0)
863
            or force
864
        ):
865
            self.index.append(nfe)
×
866
            self.last_check = nfe
×
867

868
            for metric in self.metrics:
×
869
                metric(optimizer)
×
870

871
    def to_dataframe(self):  # noqa: D102
1✔
872
        progress = {
×
873
            metric.name: result
874
            for metric in self.metrics
875
            if (result := metric.get_results())
876
        }
877

878
        progress = pd.DataFrame.from_dict(progress)
×
879

880
        if not progress.empty:
×
881
            progress["nfe"] = self.index
×
882

883
        return progress
×
884

885

886
def rebuild_platypus_population(archive, problem):
1✔
887
    """Rebuild a population of platypus Solution instances.
888

889
    Parameters
890
    ----------
891
    archive : DataFrame
892
    problem : PlatypusProblem instance
893

894
    Returns:
895
    -------
896
    list of platypus Solutions
897

898
    """
899
    expected_columns = problem.nvars + problem.nobjs
×
900
    actual_columns = len(archive.columns)
×
901

902
    if actual_columns != expected_columns:
×
903
        raise EMAError(
×
904
            f"The number of columns in the archive ({actual_columns}) does not match the "
905
            f"expected number of decision variables and objectives ({expected_columns})."
906
        )
907

908
    solutions = []
×
909
    for row in archive.itertuples():
×
910
        try:
×
911
            decision_variables = [
×
912
                getattr(row, attr) for attr in problem.parameter_names
913
            ]
914
        except AttributeError as e:
×
915
            missing_parameters = [
×
916
                attr for attr in problem.parameter_names if not hasattr(row, attr)
917
            ]
918
            raise EMAError(f"Parameter names {missing_parameters} not found in archive") from e
×
919

920
        try:
×
921
            objectives = [getattr(row, attr) for attr in problem.outcome_names]
×
922
        except AttributeError as e:
×
923
            missing_outcomes = [
×
924
                attr for attr in problem.outcome_names if not hasattr(row, attr)
925
            ]
926
            raise EMAError(f"Outcome names {missing_outcomes} not found in archive'") from e
×
927

928
        solution = Solution(problem)
×
929
        solution.variables[:] = [
×
930
            platypus_type.encode(value)
931
            for platypus_type, value in zip(problem.types, decision_variables)
932
        ]
933
        solution.objectives[:] = objectives
×
934
        solutions.append(solution)
×
935
    return solutions
×
936

937

938
class CombinedVariator(Variator):
1✔
939
    """Combined variator."""
940

941
    def __init__(self, crossover_prob=0.5, mutation_prob=1):
1✔
942
        super().__init__(2)
×
943
        self.SBX = platypus.SBX()
×
944
        self.crossover_prob = crossover_prob
×
945
        self.mutation_prob = mutation_prob
×
946

947
    def evolve(self, parents):
1✔
948
        child1 = copy.deepcopy(parents[0])
×
949
        child2 = copy.deepcopy(parents[1])
×
950
        problem = child1.problem
×
951

952
        # crossover
953
        # we will evolve the individual
954
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
955
            if random.random() <= self.crossover_prob:
×
956
                klass = kind.__class__
×
957
                child1, child2 = self._crossover[klass](self, child1, child2, i, kind)
×
958
                child1.evaluated = False
×
959
                child2.evaluated = False
×
960

961
        # mutate
962
        for child in [child1, child2]:
×
963
            self.mutate(child)
×
964

965
        return [child1, child2]
×
966

967
    def mutate(self, child):
1✔
968
        problem = child.problem
×
969

970
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
971
            if random.random() <= self.mutation_prob:
×
972
                klass = kind.__class__
×
973
                child = self._mutate[klass](self, child, i, kind)
×
974
                child.evaluated = False
×
975

976
    def crossover_real(self, child1, child2, i, type):  # @ReservedAssignment
1✔
977
        # sbx
978
        x1 = float(child1.variables[i])
×
979
        x2 = float(child2.variables[i])
×
980
        lb = type.min_value
×
981
        ub = type.max_value
×
982

983
        x1, x2 = self.SBX.sbx_crossover(x1, x2, lb, ub)
×
984

985
        child1.variables[i] = x1
×
986
        child2.variables[i] = x2
×
987

988
        return child1, child2
×
989

990
    def crossover_integer(self, child1, child2, i, type):  # @ReservedAssignment
1✔
991
        # HUX()
992
        for j in range(type.nbits):
×
993
            if child1.variables[i][j] != child2.variables[i][j]: # noqa: SIM102
×
994
                if bool(random.getrandbits(1)):
×
995
                    child1.variables[i][j] = not child1.variables[i][j]
×
996
                    child2.variables[i][j] = not child2.variables[i][j]
×
997
        return child1, child2
×
998

999
    def crossover_categorical(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1000
        # SSX()
1001
        # can probably be implemented in a simple manner, since size
1002
        # of subset is fixed to 1
1003

1004
        s1 = set(child1.variables[i])
×
1005
        s2 = set(child2.variables[i])
×
1006

1007
        for j in range(type.size):
×
1008
            if (
×
1009
                (child2.variables[i][j] not in s1)
1010
                and (child1.variables[i][j] not in s2)
1011
                and (random.random() < 0.5)
1012
            ):
1013
                temp = child1.variables[i][j]
×
1014
                child1.variables[i][j] = child2.variables[i][j]
×
1015
                child2.variables[i][j] = temp
×
1016

1017
        return child1, child2
×
1018

1019
    def mutate_real(self, child, i, type, distribution_index=20):  # @ReservedAssignment
1✔
1020
        # PM
1021
        x = child.variables[i]
×
1022
        lower = type.min_value
×
1023
        upper = type.max_value
×
1024

1025
        u = random.random()
×
1026
        dx = upper - lower
×
1027

1028
        if u < 0.5:
×
1029
            bl = (x - lower) / dx
×
1030
            b = 2.0 * u + (1.0 - 2.0 * u) * pow(1.0 - bl, distribution_index + 1.0)
×
1031
            delta = pow(b, 1.0 / (distribution_index + 1.0)) - 1.0
×
1032
        else:
1033
            bu = (upper - x) / dx
×
1034
            b = 2.0 * (1.0 - u) + 2.0 * (u - 0.5) * pow(
×
1035
                1.0 - bu, distribution_index + 1.0
1036
            )
1037
            delta = 1.0 - pow(b, 1.0 / (distribution_index + 1.0))
×
1038

1039
        x = x + delta * dx
×
1040
        x = max(lower, min(x, upper))
×
1041

1042
        child.variables[i] = x
×
1043
        return child
×
1044

1045
    def mutate_integer(self, child, i, type, probability=1):  # @ReservedAssignment
1✔
1046
        # bitflip
1047
        for j in range(type.nbits):
×
1048
            if random.random() <= probability:
×
1049
                child.variables[i][j] = not child.variables[i][j]
×
1050
        return child
×
1051

1052
    def mutate_categorical(self, child, i, type):  # @ReservedAssignment
1✔
1053
        # replace
1054
        probability = 1 / type.size
×
1055

1056
        if random.random() <= probability:
×
1057
            subset = child.variables[i]
×
1058

1059
            if len(subset) < len(type.elements):
×
1060
                j = random.randrange(len(subset))
×
1061

1062
                nonmembers = list(set(type.elements) - set(subset))
×
1063
                k = random.randrange(len(nonmembers))
×
1064
                subset[j] = nonmembers[k]
×
1065

1066
            len(subset)
×
1067

1068
            child.variables[i] = subset
×
1069

1070
        return child
×
1071

1072
    _crossover = {
1✔
1073
        Real: crossover_real,
1074
        Integer: crossover_integer,
1075
        Subset: crossover_categorical,
1076
    }
1077

1078
    _mutate = {
1✔
1079
        Real: mutate_real,
1080
        Integer: mutate_integer,
1081
        Subset: mutate_categorical,
1082
    }
1083

1084

1085
def _optimize(
1✔
1086
    problem,
1087
    evaluator,
1088
    algorithm,
1089
    convergence,
1090
    nfe,
1091
    convergence_freq,
1092
    logging_freq,
1093
    variator=None,
1094
    **kwargs,
1095
):
1096
    """Helper function for optimization."""
1097
    klass = problem.types[0].__class__
×
1098

1099
    try:
×
1100
        eps_values = kwargs["epsilons"]
×
1101
    except KeyError:
×
1102
        pass
×
1103
    else:
1104
        if len(eps_values) != len(problem.outcome_names):
×
1105
            raise EMAError("Number of epsilon values does not match number of outcomes")
×
1106

1107
    if variator is None:
×
1108
        if all(isinstance(t, klass) for t in problem.types):
×
1109
            variator = None
×
1110
        else:
1111
            variator = CombinedVariator()
×
1112
    # mutator = CombinedMutator()
1113

1114
    optimizer = algorithm(
×
1115
        problem, evaluator=evaluator, variator=variator, log_frequency=500, **kwargs
1116
    )
1117
    # optimizer.mutator = mutator
1118

1119
    convergence = Convergence(
×
1120
        convergence, nfe, convergence_freq=convergence_freq, logging_freq=logging_freq
1121
    )
1122
    callback = functools.partial(convergence, optimizer)
×
1123
    evaluator.callback = callback
×
1124

1125
    with temporary_filter(name=[callbacks.__name__, evaluators.__name__], level=INFO):
×
1126
        optimizer.run(nfe)
×
1127

1128
    convergence(optimizer, force=True)
×
1129

1130
    # convergence.pbar.__exit__(None, None, None)
1131

1132
    results = to_dataframe(
×
1133
        optimizer.result, problem.parameter_names, problem.outcome_names
1134
    )
1135
    convergence = convergence.to_dataframe()
×
1136

1137
    message = "optimization completed, found {} solutions"
×
1138
    _logger.info(message.format(len(optimizer.archive)))
×
1139

1140
    if convergence.empty:
×
1141
        return results
×
1142
    else:
1143
        return results, convergence
×
1144

1145

1146
class BORGDefaultDescriptor:
1✔
1147
    """Descriptor used by Borg."""
1148
    # this treats defaults as class level attributes!
1149

1150
    def __init__(self, default_function):
1✔
1151
        self.default_function = default_function
1✔
1152

1153
    def __get__(self, instance, owner):
1✔
1154
        return self.default_function(instance.problem.nvars)
1✔
1155

1156
    def __set_name__(self, owner, name):
1✔
1157
        self.name = name
1✔
1158

1159

1160
class GenerationalBorg(EpsilonProgressContinuation):
1✔
1161
    """A generational implementation of the BORG Framework.
1162

1163
    This algorithm adopts Epsilon Progress Continuation, and Auto Adaptive
1164
    Operator Selection, but embeds them within the NSGAII generational
1165
    algorithm, rather than the steady state implementation used by the BORG
1166
    algorithm.
1167

1168
    The parametrization of all operators is based on the default values as used
1169
    in Borg 1.9.
1170

1171
    Note:: limited to RealParameters only.
1172

1173
    """
1174

1175
    pm_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1176
    pm_dist = 20
1✔
1177

1178
    sbx_prop = 1
1✔
1179
    sbx_dist = 15
1✔
1180

1181
    de_rate = 0.1
1✔
1182
    de_stepsize = 0.5
1✔
1183

1184
    um_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1185

1186
    spx_nparents = 10
1✔
1187
    spx_noffspring = 2
1✔
1188
    spx_expansion = 0.3
1✔
1189

1190
    pcx_nparents = 10
1✔
1191
    pcx_noffspring = 2
1✔
1192
    pcx_eta = 0.1
1✔
1193
    pcx_zeta = 0.1
1✔
1194

1195
    undx_nparents = 10
1✔
1196
    undx_noffspring = 2
1✔
1197
    undx_zeta = 0.5
1✔
1198
    undx_eta = 0.35
1✔
1199

1200
    def __init__(
1✔
1201
        self,
1202
        problem,
1203
        epsilons,
1204
        population_size=100,
1205
        generator=RandomGenerator(), # noqa: B008
1206
        selector=TournamentSelector(2), # noqa: B008
1207
        variator=None,
1208
        **kwargs,
1209
    ):
1210
        """Init."""
1211
        self.problem = problem
×
1212

1213
        # Parameterization taken from
1214
        # Borg: An Auto-Adaptive MOEA Framework - Hadka, Reed
1215
        variators = [
×
1216
            GAOperator(
1217
                SBX(probability=self.sbx_prop, distribution_index=self.sbx_dist),
1218
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1219
            ),
1220
            GAOperator(
1221
                PCX(
1222
                    nparents=self.pcx_nparents,
1223
                    noffspring=self.pcx_noffspring,
1224
                    eta=self.pcx_eta,
1225
                    zeta=self.pcx_zeta,
1226
                ),
1227
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1228
            ),
1229
            GAOperator(
1230
                DifferentialEvolution(
1231
                    crossover_rate=self.de_rate, step_size=self.de_stepsize
1232
                ),
1233
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1234
            ),
1235
            GAOperator(
1236
                UNDX(
1237
                    nparents=self.undx_nparents,
1238
                    noffspring=self.undx_noffspring,
1239
                    zeta=self.undx_zeta,
1240
                    eta=self.undx_eta,
1241
                ),
1242
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1243
            ),
1244
            GAOperator(
1245
                SPX(
1246
                    nparents=self.spx_nparents,
1247
                    noffspring=self.spx_noffspring,
1248
                    expansion=self.spx_expansion,
1249
                ),
1250
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1251
            ),
1252
            UM(probability=self.um_p),
1253
        ]
1254

1255
        variator = Multimethod(self, variators)
×
1256

1257
        super().__init__(
×
1258
            NSGAII(
1259
                problem,
1260
                population_size,
1261
                generator,
1262
                selector,
1263
                variator,
1264
                EpsilonBoxArchive(epsilons),
1265
                **kwargs,
1266
            )
1267
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc