• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 18194452655

02 Oct 2025 01:27PM UTC coverage: 88.664% (+0.5%) from 88.199%
18194452655

push

github

web-flow
Simplifcation of classses and functions related to sampling (#420)

525 of 549 new or added lines in 37 files covered. (95.63%)

10 existing lines in 4 files now uncovered.

7853 of 8857 relevant lines covered (88.66%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.01
/ema_workbench/em_framework/optimization.py
1
"""Wrapper around platypus-opt."""
2

3
import abc
1✔
4
import copy
1✔
5
import functools
1✔
6
import os
1✔
7
import random
1✔
8
import shutil
1✔
9
import tarfile
1✔
10
import warnings
1✔
11
from collections.abc import Callable, Iterable
1✔
12

13
import numpy as np
1✔
14
import pandas as pd
1✔
15

16
from ..util import INFO, EMAError, get_module_logger, temporary_filter
1✔
17
from . import callbacks, evaluators
1✔
18
from .model import AbstractModel
1✔
19
from .outcomes import AbstractOutcome
1✔
20
from .parameters import (
1✔
21
    BooleanParameter,
22
    CategoricalParameter,
23
    IntegerParameter,
24
    RealParameter,
25
)
26
from .points import Sample
1✔
27
from .util import ProgressTrackingMixIn, determine_objects
1✔
28

29
try:
1✔
30
    import platypus
1✔
31
    from platypus import (
1✔
32
        NSGAII,
33
        PCX,
34
        PM,
35
        SBX,
36
        SPX,
37
        UM,
38
        UNDX,
39
        Algorithm,
40
        DifferentialEvolution,
41
        EpsilonBoxArchive,
42
        EpsilonIndicator,
43
        EpsilonProgressContinuation,
44
        EpsNSGAII,
45
        GAOperator,
46
        GenerationalDistance,
47
        Hypervolume,
48
        Integer,
49
        InvertedGenerationalDistance,
50
        Multimethod,
51
        RandomGenerator,
52
        Real,
53
        Solution,
54
        Spacing,
55
        Subset,
56
        TournamentSelector,
57
        Variator,
58
    )  # @UnresolvedImport
59
    from platypus import Problem as PlatypusProblem
1✔
60

61

62
except ImportError:
×
63
    warnings.warn(
×
64
        "Platypus based optimization not available. Install with `pip install platypus-opt`",
65
        ImportWarning,
66
        stacklevel=2,
67
    )
68

69
    class PlatypusProblem:
×
70
        constraints = []
×
71

72
        def __init__(self, *args, **kwargs):
×
73
            pass
×
74

75
    class Variator:
×
76
        def __init__(self, *args, **kwargs):
×
77
            pass
×
78

79
    class RandomGenerator:
×
80
        def __call__(self, *args, **kwargs):
×
81
            pass
×
82

83
    class TournamentSelector:
×
84
        def __init__(self, *args, **kwargs):
×
85
            pass
×
86

87
        def __call__(self, *args, **kwargs):
×
88
            pass
×
89

90
    class EpsilonProgressContinuation:
×
91
        pass
×
92

93
    EpsNSGAII = None
×
94
    platypus = None
×
95
    Real = Integer = Subset = None
×
96

97
# Created on 5 Jun 2017
98
#
99
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
100

101
__all__ = [
1✔
102
    "ArchiveLogger",
103
    "Convergence",
104
    "EpsilonIndicatorMetric",
105
    "EpsilonProgress",
106
    "GenerationalDistanceMetric",
107
    "HypervolumeMetric",
108
    "InvertedGenerationalDistanceMetric",
109
    "OperatorProbabilities",
110
    "Problem",
111
    "RobustProblem",
112
    "SpacingMetric",
113
    "epsilon_nondominated",
114
    "rebuild_platypus_population",
115
    "to_problem",
116
    "to_robust_problem",
117
]
118
_logger = get_module_logger(__name__)
1✔
119

120

121
class Problem(PlatypusProblem):
1✔
122
    """Small extension to Platypus problem object.
123

124
    Includes information on the names of the decision variables, the names of the outcomes,
125
    and the type of search.
126
    """
127

128
    @property
1✔
129
    def parameter_names(self):
1✔
130
        """Getter for parameter names."""
131
        return [e.name for e in self.parameters]
×
132

133
    def __init__(
1✔
134
        self, searchover, parameters, outcome_names, constraints, reference=None
135
    ):
136
        """Init."""
137
        if constraints is None:
1✔
138
            constraints = []
1✔
139

140
        super().__init__(len(parameters), len(outcome_names), nconstrs=len(constraints))
1✔
141
        #         assert len(parameters) == len(parameter_names)
142
        assert searchover in ("levers", "uncertainties", "robust")
1✔
143

144
        if searchover == "levers" or searchover == "uncertainties":
1✔
145
            assert not reference or isinstance(reference, Sample)
1✔
146
        else:
147
            assert not reference
1✔
148

149
        self.searchover = searchover
1✔
150
        self.parameters = parameters
1✔
151
        self.outcome_names = outcome_names
1✔
152
        self.ema_constraints = constraints
1✔
153
        self.constraint_names = [c.name for c in constraints]
1✔
154
        self.reference = reference if reference else 0
1✔
155

156

157
class RobustProblem(Problem):
1✔
158
    """Small extension to Problem object for robust optimization.
159

160
    adds the scenarios and the robustness functions
161
    """
162

163
    def __init__(
1✔
164
        self, parameters, outcome_names, scenarios, robustness_functions, constraints
165
    ):
166
        """Init."""
167
        super().__init__("robust", parameters, outcome_names, constraints)
1✔
168
        assert len(robustness_functions) == len(outcome_names)
1✔
169
        self.scenarios = scenarios
1✔
170
        self.robustness_functions = robustness_functions
1✔
171

172

173
def to_problem(
1✔
174
    model: AbstractModel,
175
    searchover: str,
176
    reference: Sample | None = None,
177
    constraints=None,
178
):
179
    """Helper function to create Problem object.
180

181
    Parameters
182
    ----------
183
    model : AbstractModel instance
184
    searchover : str
185
    reference : Sample instance, optional
186
                overwrite the default scenario in case of searching over
187
                levers, or default policy in case of searching over
188
                uncertainties
189
    constraints : list, optional
190

191
    Returns
192
    -------
193
    Problem instance
194

195
    """
196
    # extract the levers and the outcomes
197
    decision_variables = determine_objects(model, searchover, union=True)
1✔
198

199
    outcomes = determine_objects(model, "outcomes")
1✔
200
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
201
    outcome_names = [outcome.name for outcome in outcomes]
1✔
202

203
    if not outcomes:
1✔
204
        raise EMAError(
×
205
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
206
        )
207

208
    problem = Problem(
1✔
209
        searchover, decision_variables, outcome_names, constraints, reference=reference
210
    )
211
    problem.types[:] = to_platypus_types(decision_variables)
1✔
212
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
213
    problem.constraints[:] = "==0"
1✔
214

215
    return problem
1✔
216

217

218
def to_robust_problem(model, scenarios, robustness_functions, constraints=None):
1✔
219
    """Helper function to create RobustProblem object.
220

221
    Parameters
222
    ----------
223
    model : AbstractModel instance
224
    scenarios : collection
225
    robustness_functions : iterable of ScalarOutcomes
226
    constraints : list, optional
227

228
    Returns
229
    -------
230
    RobustProblem instance
231

232
    """
233
    # extract the levers and the outcomes
234
    decision_variables = determine_objects(model, "levers", union=True)
1✔
235

236
    outcomes = robustness_functions
1✔
237
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
238
    outcome_names = [outcome.name for outcome in outcomes]
1✔
239

240
    if not outcomes:
1✔
241
        raise EMAError(
×
242
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
243
        )
244

245
    problem = RobustProblem(
1✔
246
        decision_variables, outcome_names, scenarios, robustness_functions, constraints
247
    )
248

249
    problem.types[:] = to_platypus_types(decision_variables)
1✔
250
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
251
    problem.constraints[:] = "==0"
1✔
252

253
    return problem
1✔
254

255

256
def to_platypus_types(decision_variables):
1✔
257
    """Helper function for mapping from workbench parameter types to platypus parameter types."""
258
    # TODO:: should categorical not be platypus.Subset, with size == 1?
259
    _type_mapping = {
1✔
260
        RealParameter: platypus.Real,
261
        IntegerParameter: platypus.Integer,
262
        CategoricalParameter: platypus.Subset,
263
        BooleanParameter: platypus.Subset,
264
    }
265

266
    types = []
1✔
267
    for dv in decision_variables:
1✔
268
        klass = _type_mapping[type(dv)]
1✔
269

270
        if not isinstance(dv, (CategoricalParameter | BooleanParameter)):
1✔
271
            decision_variable = klass(dv.lower_bound, dv.upper_bound)
1✔
272
        else:
273
            decision_variable = klass(dv.categories, 1)
1✔
274

275
        types.append(decision_variable)
1✔
276
    return types
1✔
277

278

279
def to_dataframe(solutions, dvnames, outcome_names):
1✔
280
    """Helper function to turn a collection of platypus Solution instances into a pandas DataFrame.
281

282
    Parameters
283
    ----------
284
    solutions : collection of Solution instances
285
    dvnames : list of str
286
    outcome_names : list of str
287

288
    Returns
289
    -------
290
    pandas DataFrame
291
    """
292
    results = []
1✔
293
    for solution in platypus.unique(solutions):
1✔
294
        vars = transform_variables(
1✔
295
            solution.problem, solution.variables
296
        )  # @ReservedAssignment
297

298
        decision_vars = dict(zip(dvnames, vars))
1✔
299
        decision_out = dict(zip(outcome_names, solution.objectives))
1✔
300

301
        result = decision_vars.copy()
1✔
302
        result.update(decision_out)
1✔
303

304
        results.append(result)
1✔
305

306
    results = pd.DataFrame(results, columns=dvnames + outcome_names)
1✔
307
    return results
1✔
308

309

310
def process_uncertainties(jobs):
1✔
311
    """Helper function to map jobs generated by platypus to Scenario objects.
312

313
    Parameters
314
    ----------
315
    jobs : collection
316

317
    Returns
318
    -------
319
    scenarios, policies
320

321
    """
322
    problem = jobs[0].solution.problem
×
323
    scenarios = []
×
324

325
    jobs = _process(jobs, problem)
×
326
    for i, job in enumerate(jobs):
×
327
        name = str(i)
×
NEW
328
        scenario = Sample(name=name, **job)
×
329
        scenarios.append(scenario)
×
330

331
    policies = problem.reference
×
332

333
    return scenarios, policies
×
334

335

336
def process_levers(jobs):
1✔
337
    """Helper function to map jobs generated by platypus to Sample instances.
338

339
    Parameters
340
    ----------
341
    jobs : collection
342

343
    Returns
344
    -------
345
    scenarios, policies
346

347
    """
348
    problem = jobs[0].solution.problem
×
349
    policies = []
×
350
    jobs = _process(jobs, problem)
×
351
    for i, job in enumerate(jobs):
×
352
        name = str(i)
×
NEW
353
        policy = Sample(name=name, **job)
×
354
        policies.append(policy)
×
355

356
    scenarios = problem.reference
×
357

358
    return scenarios, policies
×
359

360

361
def _process(jobs, problem):
1✔
362
    """Helper function to transform platypus job to dict with correct values for workbench."""
363
    processed_jobs = []
×
364
    for job in jobs:
×
365
        variables = transform_variables(problem, job.solution.variables)
×
366
        processed_job = {}
×
367
        for param, var in zip(problem.parameters, variables):
×
368
            try:
×
NEW
369
                var = var.value  # noqa: PLW2901
×
370
            except AttributeError:
×
371
                pass
×
372
            processed_job[param.name] = var
×
373
        processed_jobs.append(processed_job)
×
374
    return processed_jobs
×
375

376

377
def process_robust(jobs):
1✔
378
    """Helper function to process robust optimization jobs.
379

380
    Parameters
381
    ----------
382
    jobs : collection
383

384
    Returns
385
    -------
386
    scenarios, policies
387

388
    """
389
    _, policies = process_levers(jobs)
×
390
    scenarios = jobs[0].solution.problem.scenarios
×
391

392
    return scenarios, policies
×
393

394

395
def transform_variables(problem, variables):
1✔
396
    """Helper function for transforming platypus variables."""
397
    converted_vars = []
1✔
398
    for type, var in zip(problem.types, variables):  # @ReservedAssignment
1✔
399
        var = type.decode(var)  # noqa: PLW2901
1✔
400
        try:
1✔
401
            var = var[0]  # noqa: PLW2901
1✔
402
        except TypeError:
1✔
403
            pass
1✔
404

405
        converted_vars.append(var)
1✔
406
    return converted_vars
1✔
407

408

409
def evaluate(jobs_collection, experiments, outcomes, problem):
1✔
410
    """Helper function for mapping the results from perform_experiments back to what platypus needs."""
411
    searchover = problem.searchover
×
412
    outcome_names = problem.outcome_names
×
413
    constraints = problem.ema_constraints
×
414

415
    column = "policy" if searchover == "levers" else "scenario"
×
416

417
    for entry, job in jobs_collection:
×
418
        logical = experiments[column] == entry.name
×
419

420
        job_outputs = {}
×
421
        for k, v in outcomes.items():
×
422
            job_outputs[k] = v[logical][0]
×
423

424
        # TODO:: only retain uncertainties
425
        job_experiment = experiments[logical]
×
426
        job_constraints = _evaluate_constraints(
×
427
            job_experiment, job_outputs, constraints
428
        )
429
        job_outcomes = [job_outputs[key] for key in outcome_names]
×
430

431
        if job_constraints:
×
NEW
432
            job.solution.problem.function = lambda _: (
×
433
                job_outcomes,
434
                job_constraints,
435
            )
436
        else:
437
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
438
        job.solution.evaluate()
×
439

440

441
def evaluate_robust(jobs_collection, experiments, outcomes, problem):
1✔
442
    """Helper function for mapping the results from perform_experiments back to what Platypus needs."""
443
    robustness_functions = problem.robustness_functions
×
444
    constraints = problem.ema_constraints
×
445

446
    for entry, job in jobs_collection:
×
447
        logical = experiments["policy"] == entry.name
×
448

449
        job_outcomes_dict = {}
×
450
        job_outcomes = []
×
451
        for rf in robustness_functions:
×
452
            data = [outcomes[var_name][logical] for var_name in rf.variable_name]
×
453
            score = rf.function(*data)
×
454
            job_outcomes_dict[rf.name] = score
×
455
            job_outcomes.append(score)
×
456

457
        # TODO:: only retain levers
458
        job_experiment = experiments[logical].iloc[0]
×
459
        job_constraints = _evaluate_constraints(
×
460
            job_experiment, job_outcomes_dict, constraints
461
        )
462

463
        if job_constraints:
×
NEW
464
            job.solution.problem.function = lambda _: (
×
465
                job_outcomes,
466
                job_constraints,
467
            )
468
        else:
469
            job.solution.problem.function = lambda _: job_outcomes  # noqa: B023
×
470

471
        job.solution.evaluate()
×
472

473

474
def _evaluate_constraints(job_experiment, job_outcomes, constraints):
1✔
475
    """Helper function for evaluating the constraints for a given job."""
476
    job_constraints = []
×
477
    for constraint in constraints:
×
478
        data = [job_experiment[var] for var in constraint.parameter_names]
×
479
        data += [job_outcomes[var] for var in constraint.outcome_names]
×
480
        constraint_value = constraint.process(data)
×
481
        job_constraints.append(constraint_value)
×
482
    return job_constraints
×
483

484

485
class AbstractConvergenceMetric(abc.ABC):
1✔
486
    """Base convergence metric class."""
487

488
    def __init__(self, name):
1✔
489
        """Init."""
490
        super().__init__()
×
491
        self.name = name
×
492
        self.results = []
×
493

494
    @abc.abstractmethod
1✔
495
    def __call__(self, optimizer):
1✔
496
        """Call the convergence metric."""
497

498
    def reset(self):
1✔
499
        self.results = []
×
500

501
    def get_results(self):
1✔
502
        return self.results
×
503

504

505
class EpsilonProgress(AbstractConvergenceMetric):
1✔
506
    """Epsilon progress convergence metric class."""
507

508
    def __init__(self):
1✔
509
        """Init."""
510
        super().__init__("epsilon_progress")
×
511

512
    def __call__(self, optimizer):  # noqa: D102
1✔
513
        self.results.append(optimizer.archive.improvements)
×
514

515

516
class MetricWrapper:
1✔
517
    """Wrapper class for wrapping platypus indicators.
518

519
    Parameters
520
    ----------
521
    reference_set : DataFrame
522
    problem : PlatypusProblem instance
523
    kwargs : dict
524
             any additional keyword arguments to be passed
525
             on to the wrapper platypus indicator class
526

527
    Notes
528
    -----
529
    this class relies on multi-inheritance and careful consideration
530
    of the MRO to conveniently wrap the convergence metrics provided
531
    by platypus.
532

533
    """
534

535
    def __init__(self, reference_set, problem, **kwargs):
1✔
536
        self.problem = problem
×
537
        reference_set = rebuild_platypus_population(reference_set, self.problem)
×
538
        super().__init__(reference_set=reference_set, **kwargs)
×
539

540
    def calculate(self, archive):
1✔
541
        solutions = rebuild_platypus_population(archive, self.problem)
×
542
        return super().calculate(solutions)
×
543

544

545
class HypervolumeMetric(MetricWrapper, Hypervolume):
1✔
546
    """Hypervolume metric.
547

548
    Parameters
549
    ----------
550
    reference_set : DataFrame
551
    problem : PlatypusProblem instance
552

553
    this is a thin wrapper around Hypervolume as provided
554
    by platypus to make it easier to use in conjunction with the
555
    workbench.
556

557
    """
558

559

560
class GenerationalDistanceMetric(MetricWrapper, GenerationalDistance):
1✔
561
    """GenerationalDistance metric.
562

563
    Parameters
564
    ----------
565
    reference_set : DataFrame
566
    problem : PlatypusProblem instance
567
    d : int, default=1
568
        the power in the intergenerational distance function
569

570

571
    This is a thin wrapper around GenerationalDistance as provided
572
    by platypus to make it easier to use in conjunction with the
573
    workbench.
574

575
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
576
    for more information
577

578
    """
579

580

581
class InvertedGenerationalDistanceMetric(MetricWrapper, InvertedGenerationalDistance):
1✔
582
    """InvertedGenerationalDistance metric.
583

584
    Parameters
585
    ----------
586
    reference_set : DataFrame
587
    problem : PlatypusProblem instance
588
    d : int, default=1
589
        the power in the inverted intergenerational distance function
590

591

592
    This is a thin wrapper around InvertedGenerationalDistance as provided
593
    by platypus to make it easier to use in conjunction with the
594
    workbench.
595

596
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
597
    for more information
598

599
    """
600

601

602
class EpsilonIndicatorMetric(MetricWrapper, EpsilonIndicator):
1✔
603
    """EpsilonIndicator metric.
604

605
    Parameters
606
    ----------
607
    reference_set : DataFrame
608
    problem : PlatypusProblem instance
609

610

611
    this is a thin wrapper around EpsilonIndicator as provided
612
    by platypus to make it easier to use in conjunction with the
613
    workbench.
614

615
    """
616

617

618
class SpacingMetric(MetricWrapper, Spacing):
1✔
619
    """Spacing metric.
620

621
    Parameters
622
    ----------
623
    problem : PlatypusProblem instance
624

625

626
    this is a thin wrapper around Spacing as provided
627
    by platypus to make it easier to use in conjunction with the
628
    workbench.
629

630
    """
631

632
    def __init__(self, problem):
1✔
633
        self.problem = problem
×
634

635

636
class HyperVolume(AbstractConvergenceMetric):
1✔
637
    """Hypervolume convergence metric class.
638

639
    This metric is derived from a hyper-volume measure, which describes the
640
    multi-dimensional volume of space contained within the pareto front. When
641
    computed with minimum and maximums, it describes the ratio of dominated
642
    outcomes to all possible outcomes in the extent of the space.  Getting this
643
    number to be high or low is not necessarily important, as not all outcomes
644
    within the min-max range will be feasible.  But, having the hypervolume remain
645
    fairly stable over multiple generations of the evolutionary algorithm provides
646
    an indicator of convergence.
647

648
    Parameters
649
    ----------
650
    minimum : numpy array
651
    maximum : numpy array
652

653

654
    This class is deprecated and will be removed in version 3.0 of the EMAworkbench.
655
    Use ArchiveLogger instead and calculate hypervolume in post using HypervolumeMetric
656
    as also shown in the directed search tutorial.
657

658
    """
659

660
    def __init__(self, minimum, maximum):
1✔
661
        super().__init__("hypervolume")
×
662
        warnings.warn(
×
663
            "HyperVolume is deprecated and will be removed in version 3.0 of the EMAworkbench."
664
            "Use ArchiveLogger and HypervolumeMetric instead",
665
            DeprecationWarning,
666
            stacklevel=2,
667
        )
668
        self.hypervolume_func = Hypervolume(minimum=minimum, maximum=maximum)
×
669

670
    def __call__(self, optimizer):
1✔
671
        self.results.append(self.hypervolume_func.calculate(optimizer.archive))
×
672

673
    @classmethod
1✔
674
    def from_outcomes(cls, outcomes):
1✔
675
        ranges = [o.expected_range for o in outcomes if o.kind != o.INFO]
×
676
        minimum, maximum = np.asarray(list(zip(*ranges)))
×
677
        return cls(minimum, maximum)
×
678

679

680
class ArchiveLogger(AbstractConvergenceMetric):
1✔
681
    """Helper class to write the archive to disk at each iteration.
682

683
    Parameters
684
    ----------
685
    directory : str
686
    decision_varnames : list of str
687
    outcome_varnames : list of str
688
    base_filename : str, optional
689
    """
690

691
    def __init__(
1✔
692
        self,
693
        directory,
694
        decision_varnames,
695
        outcome_varnames,
696
        base_filename="archives.tar.gz",
697
    ):
698
        """Init."""
699
        super().__init__("archive_logger")
×
700

701
        self.directory = os.path.abspath(directory)
×
702
        self.temp = os.path.join(self.directory, "tmp")
×
703
        os.makedirs(self.temp, exist_ok=True)
×
704

705
        self.base = base_filename
×
706
        self.decision_varnames = decision_varnames
×
707
        self.outcome_varnames = outcome_varnames
×
708
        self.tarfilename = os.path.join(self.directory, base_filename)
×
709

710
        # self.index = 0
711

712
    def __call__(self, optimizer):  # noqa: D102
1✔
713
        archive = to_dataframe(
×
714
            optimizer.result, self.decision_varnames, self.outcome_varnames
715
        )
716
        archive.to_csv(os.path.join(self.temp, f"{optimizer.nfe}.csv"), index=False)
×
717

718
    def reset(self):  # noqa: D102
1✔
719
        # FIXME what needs to go here?
720
        pass
×
721

722
    def get_results(self):  # noqa: D102
1✔
723
        with tarfile.open(self.tarfilename, "w:gz") as z:
×
724
            z.add(self.temp, arcname=os.path.basename(self.temp))
×
725

726
        shutil.rmtree(self.temp)
×
727
        return None
×
728

729
    @classmethod
1✔
730
    def load_archives(cls, filename):
1✔
731
        """Load the archives stored with the ArchiveLogger.
732

733
        Parameters
734
        ----------
735
        filename : str
736
                   relative path to file
737

738
        Returns
739
        -------
740
        dict with nfe as key and dataframe as vlaue
741
        """
742
        archives = {}
×
743
        with tarfile.open(os.path.abspath(filename)) as fh:
×
744
            for entry in fh.getmembers():
×
745
                if entry.name.endswith("csv"):
×
746
                    key = entry.name.split("/")[1][:-4]
×
747
                    archives[int(key)] = pd.read_csv(fh.extractfile(entry))
×
748
        return archives
×
749

750

751
class OperatorProbabilities(AbstractConvergenceMetric):
1✔
752
    """OperatorProbabiliy convergence tracker for use with auto adaptive operator selection.
753

754
    Parameters
755
    ----------
756
    name : str
757
    index : int
758

759

760
    State of the art MOEAs like Borg (and GenerationalBorg provided by the workbench)
761
    use autoadaptive operator selection. The algorithm has multiple different evolutionary
762
    operators. Over the run, it tracks how well each operator is doing in producing fitter
763
    offspring. The probability of the algorithm using a given evolutionary operator is
764
    proportional to how well this operator has been doing in producing fitter offspring in
765
    recent generations. This class can be used to track these probabilities over the
766
    run of the algorithm.
767

768
    """
769

770
    def __init__(self, name, index):
1✔
771
        super().__init__(name)
×
772
        self.index = index
×
773

774
    def __call__(self, optimizer):  # noqa: D102
1✔
775
        try:
×
776
            props = optimizer.algorithm.variator.probabilities
×
777
            self.results.append(props[self.index])
×
778
        except AttributeError:
×
779
            pass
×
780

781

782
def epsilon_nondominated(results, epsilons, problem):
1✔
783
    """Merge the list of results into a single set of non dominated results using the provided epsilon values.
784

785
    Parameters
786
    ----------
787
    results : list of DataFrames
788
    epsilons : epsilon values for each objective
789
    problem : PlatypusProblem instance
790

791
    Returns
792
    -------
793
    DataFrame
794

795
    Notes
796
    -----
797
    this is a platypus based alternative to pareto.py (https://github.com/matthewjwoodruff/pareto.py)
798
    """
799
    if problem.nobjs != len(epsilons):
×
800
        raise ValueError(
×
801
            f"The number of epsilon values ({len(epsilons)}) must match the number of objectives {problem.nobjs}"
802
        )
803

804
    results = pd.concat(results, ignore_index=True)
×
805
    solutions = rebuild_platypus_population(results, problem)
×
806
    archive = EpsilonBoxArchive(epsilons)
×
807
    archive += solutions
×
808

809
    return to_dataframe(archive, problem.parameter_names, problem.outcome_names)
×
810

811

812
class Convergence(ProgressTrackingMixIn):
1✔
813
    """helper class for tracking convergence of optimization."""
814

815
    valid_metrics = {"hypervolume", "epsilon_progress", "archive_logger"}
1✔
816

817
    def __init__(
1✔
818
        self,
819
        metrics,
820
        max_nfe,
821
        convergence_freq=1000,
822
        logging_freq=5,
823
        log_progress=False,
824
    ):
825
        """Init."""
826
        super().__init__(
×
827
            max_nfe,
828
            logging_freq,
829
            _logger,
830
            log_progress=log_progress,
831
            log_func=lambda self: f"generation"
832
            f" {self.generation}, {self.i}/{self.max_nfe}",
833
        )
834

835
        self.max_nfe = max_nfe
×
836
        self.generation = -1
×
837
        self.index = []
×
838
        self.last_check = 0
×
839

840
        if metrics is None:
×
841
            metrics = []
×
842

843
        self.metrics = metrics
×
844
        self.convergence_freq = convergence_freq
×
845
        self.logging_freq = logging_freq
×
846

847
        # TODO what is the point of this code?
848
        for metric in metrics:
×
849
            assert isinstance(metric, AbstractConvergenceMetric)
×
850
            metric.reset()
×
851

852
    def __call__(self, optimizer, force=False):
1✔
853
        """Stores convergences information given specified convergence frequency.
854

855
        Parameters
856
        ----------
857
        optimizer : platypus optimizer instance
858
        force : boolean, optional
859
                if True, convergence information will always be stored
860
                if False, converge information will be stored if the
861
                the number of nfe since the last time of storing is equal to
862
                or higher then convergence_freq
863

864

865
        the primary use case for force is to force convergence frequency information
866
        to be stored once the stopping condition of the optimizer has been reached
867
        so that the final convergence information is kept.
868

869
        """
870
        nfe = optimizer.nfe
×
871
        super().__call__(nfe - self.i)
×
872

873
        self.generation += 1
×
874

875
        if (
×
876
            (nfe >= self.last_check + self.convergence_freq)
877
            or (self.last_check == 0)
878
            or force
879
        ):
880
            self.index.append(nfe)
×
881
            self.last_check = nfe
×
882

883
            for metric in self.metrics:
×
884
                metric(optimizer)
×
885

886
    def to_dataframe(self):  # noqa: D102
1✔
887
        progress = {
×
888
            metric.name: result
889
            for metric in self.metrics
890
            if (result := metric.get_results())
891
        }
892

893
        progress = pd.DataFrame.from_dict(progress)
×
894

895
        if not progress.empty:
×
896
            progress["nfe"] = self.index
×
897

898
        return progress
×
899

900

901
def rebuild_platypus_population(archive, problem):
1✔
902
    """Rebuild a population of platypus Solution instances.
903

904
    Parameters
905
    ----------
906
    archive : DataFrame
907
    problem : PlatypusProblem instance
908

909
    Returns
910
    -------
911
    list of platypus Solutions
912

913
    """
914
    expected_columns = problem.nvars + problem.nobjs
×
915
    actual_columns = len(archive.columns)
×
916

917
    if actual_columns != expected_columns:
×
918
        raise EMAError(
×
919
            f"The number of columns in the archive ({actual_columns}) does not match the "
920
            f"expected number of decision variables and objectives ({expected_columns})."
921
        )
922

923
    solutions = []
×
924
    for row in archive.itertuples():
×
925
        try:
×
926
            decision_variables = [
×
927
                getattr(row, attr) for attr in problem.parameter_names
928
            ]
929
        except AttributeError as e:
×
930
            missing_parameters = [
×
931
                attr for attr in problem.parameter_names if not hasattr(row, attr)
932
            ]
NEW
933
            raise EMAError(
×
934
                f"Parameter names {missing_parameters} not found in archive"
935
            ) from e
936

937
        try:
×
938
            objectives = [getattr(row, attr) for attr in problem.outcome_names]
×
939
        except AttributeError as e:
×
940
            missing_outcomes = [
×
941
                attr for attr in problem.outcome_names if not hasattr(row, attr)
942
            ]
NEW
943
            raise EMAError(
×
944
                f"Outcome names {missing_outcomes} not found in archive'"
945
            ) from e
946

947
        solution = Solution(problem)
×
948
        solution.variables[:] = [
×
949
            platypus_type.encode(value)
950
            for platypus_type, value in zip(problem.types, decision_variables)
951
        ]
952
        solution.objectives[:] = objectives
×
953
        solutions.append(solution)
×
954
    return solutions
×
955

956

957
class CombinedVariator(Variator):
1✔
958
    """Combined variator."""
959

960
    def __init__(self, crossover_prob=0.5, mutation_prob=1):
1✔
961
        super().__init__(2)
×
962
        self.SBX = platypus.SBX()
×
963
        self.crossover_prob = crossover_prob
×
964
        self.mutation_prob = mutation_prob
×
965

966
    def evolve(self, parents):
1✔
967
        child1 = copy.deepcopy(parents[0])
×
968
        child2 = copy.deepcopy(parents[1])
×
969
        problem = child1.problem
×
970

971
        # crossover
972
        # we will evolve the individual
973
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
974
            if random.random() <= self.crossover_prob:
×
975
                klass = kind.__class__
×
976
                child1, child2 = self._crossover[klass](self, child1, child2, i, kind)
×
977
                child1.evaluated = False
×
978
                child2.evaluated = False
×
979

980
        # mutate
981
        for child in [child1, child2]:
×
982
            self.mutate(child)
×
983

984
        return [child1, child2]
×
985

986
    def mutate(self, child):
1✔
987
        problem = child.problem
×
988

989
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
990
            if random.random() <= self.mutation_prob:
×
991
                klass = kind.__class__
×
992
                child = self._mutate[klass](self, child, i, kind)
×
993
                child.evaluated = False
×
994

995
    def crossover_real(self, child1, child2, i, type):  # @ReservedAssignment
1✔
996
        # sbx
997
        x1 = float(child1.variables[i])
×
998
        x2 = float(child2.variables[i])
×
999
        lb = type.min_value
×
1000
        ub = type.max_value
×
1001

1002
        x1, x2 = self.SBX.sbx_crossover(x1, x2, lb, ub)
×
1003

1004
        child1.variables[i] = x1
×
1005
        child2.variables[i] = x2
×
1006

1007
        return child1, child2
×
1008

1009
    def crossover_integer(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1010
        # HUX()
1011
        for j in range(type.nbits):
×
NEW
1012
            if child1.variables[i][j] != child2.variables[i][j]:  # noqa: SIM102
×
1013
                if bool(random.getrandbits(1)):
×
1014
                    child1.variables[i][j] = not child1.variables[i][j]
×
1015
                    child2.variables[i][j] = not child2.variables[i][j]
×
1016
        return child1, child2
×
1017

1018
    def crossover_categorical(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1019
        # SSX()
1020
        # can probably be implemented in a simple manner, since size
1021
        # of subset is fixed to 1
1022

1023
        s1 = set(child1.variables[i])
×
1024
        s2 = set(child2.variables[i])
×
1025

1026
        for j in range(type.size):
×
1027
            if (
×
1028
                (child2.variables[i][j] not in s1)
1029
                and (child1.variables[i][j] not in s2)
1030
                and (random.random() < 0.5)
1031
            ):
1032
                temp = child1.variables[i][j]
×
1033
                child1.variables[i][j] = child2.variables[i][j]
×
1034
                child2.variables[i][j] = temp
×
1035

1036
        return child1, child2
×
1037

1038
    def mutate_real(self, child, i, type, distribution_index=20):  # @ReservedAssignment
1✔
1039
        # PM
1040
        x = child.variables[i]
×
1041
        lower = type.min_value
×
1042
        upper = type.max_value
×
1043

1044
        u = random.random()
×
1045
        dx = upper - lower
×
1046

1047
        if u < 0.5:
×
1048
            bl = (x - lower) / dx
×
1049
            b = 2.0 * u + (1.0 - 2.0 * u) * pow(1.0 - bl, distribution_index + 1.0)
×
1050
            delta = pow(b, 1.0 / (distribution_index + 1.0)) - 1.0
×
1051
        else:
1052
            bu = (upper - x) / dx
×
1053
            b = 2.0 * (1.0 - u) + 2.0 * (u - 0.5) * pow(
×
1054
                1.0 - bu, distribution_index + 1.0
1055
            )
1056
            delta = 1.0 - pow(b, 1.0 / (distribution_index + 1.0))
×
1057

1058
        x = x + delta * dx
×
1059
        x = max(lower, min(x, upper))
×
1060

1061
        child.variables[i] = x
×
1062
        return child
×
1063

1064
    def mutate_integer(self, child, i, type, probability=1):  # @ReservedAssignment
1✔
1065
        # bitflip
1066
        for j in range(type.nbits):
×
1067
            if random.random() <= probability:
×
1068
                child.variables[i][j] = not child.variables[i][j]
×
1069
        return child
×
1070

1071
    def mutate_categorical(self, child, i, type):  # @ReservedAssignment
1✔
1072
        # replace
1073
        probability = 1 / type.size
×
1074

1075
        if random.random() <= probability:
×
1076
            subset = child.variables[i]
×
1077

1078
            if len(subset) < len(type.elements):
×
1079
                j = random.randrange(len(subset))
×
1080

1081
                nonmembers = list(set(type.elements) - set(subset))
×
1082
                k = random.randrange(len(nonmembers))
×
1083
                subset[j] = nonmembers[k]
×
1084

1085
            len(subset)
×
1086

1087
            child.variables[i] = subset
×
1088

1089
        return child
×
1090

1091
    _crossover = {
1✔
1092
        Real: crossover_real,
1093
        Integer: crossover_integer,
1094
        Subset: crossover_categorical,
1095
    }
1096

1097
    _mutate = {
1✔
1098
        Real: mutate_real,
1099
        Integer: mutate_integer,
1100
        Subset: mutate_categorical,
1101
    }
1102

1103

1104
def _optimize(
1✔
1105
    problem: PlatypusProblem,
1106
    evaluator: "BaseEvaluator",  # noqa: F821
1107
    algorithm: type[Algorithm],
1108
    convergence: Iterable[Callable],
1109
    nfe: int,
1110
    convergence_freq: int,
1111
    logging_freq: int,
1112
    variator: Variator = None,
1113
    **kwargs,
1114
):
1115
    """Helper function for optimization."""
1116
    klass = problem.types[0].__class__
×
1117

1118
    try:
×
1119
        eps_values = kwargs["epsilons"]
×
1120
    except KeyError:
×
1121
        pass
×
1122
    else:
1123
        if len(eps_values) != len(problem.outcome_names):
×
1124
            raise EMAError("Number of epsilon values does not match number of outcomes")
×
1125

1126
    if variator is None:
×
1127
        if all(isinstance(t, klass) for t in problem.types):
×
1128
            variator = None
×
1129
        else:
1130
            variator = CombinedVariator()
×
1131
    # mutator = CombinedMutator()
1132

1133
    optimizer = algorithm(
×
1134
        problem, evaluator=evaluator, variator=variator, log_frequency=500, **kwargs
1135
    )
1136
    # optimizer.mutator = mutator
1137

1138
    convergence = Convergence(
×
1139
        convergence, nfe, convergence_freq=convergence_freq, logging_freq=logging_freq
1140
    )
1141
    callback = functools.partial(convergence, optimizer)
×
1142
    evaluator.callback = callback
×
1143

1144
    with temporary_filter(name=[callbacks.__name__, evaluators.__name__], level=INFO):
×
1145
        optimizer.run(nfe)
×
1146

1147
    convergence(optimizer, force=True)
×
1148

1149
    # convergence.pbar.__exit__(None, None, None)
1150

1151
    results = to_dataframe(
×
1152
        optimizer.result, problem.parameter_names, problem.outcome_names
1153
    )
1154
    convergence = convergence.to_dataframe()
×
1155

1156
    message = "optimization completed, found {} solutions"
×
1157
    _logger.info(message.format(len(optimizer.archive)))
×
1158

1159
    if convergence.empty:
×
1160
        return results
×
1161
    else:
1162
        return results, convergence
×
1163

1164

1165
class BORGDefaultDescriptor:
1✔
1166
    """Descriptor used by Borg."""
1167

1168
    # this treats defaults as class level attributes!
1169

1170
    def __init__(self, default_function):
1✔
1171
        self.default_function = default_function
1✔
1172

1173
    def __get__(self, instance, owner):
1✔
1174
        return self.default_function(instance.problem.nvars)
1✔
1175

1176
    def __set_name__(self, owner, name):
1✔
1177
        self.name = name
1✔
1178

1179

1180
class GenerationalBorg(EpsilonProgressContinuation):
1✔
1181
    """A generational implementation of the BORG Framework.
1182

1183
    This algorithm adopts Epsilon Progress Continuation, and Auto Adaptive
1184
    Operator Selection, but embeds them within the NSGAII generational
1185
    algorithm, rather than the steady state implementation used by the BORG
1186
    algorithm.
1187

1188
    The parametrization of all operators is based on the default values as used
1189
    in Borg 1.9.
1190

1191
    Note:: limited to RealParameters only.
1192

1193
    """
1194

1195
    pm_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1196
    pm_dist = 20
1✔
1197

1198
    sbx_prop = 1
1✔
1199
    sbx_dist = 15
1✔
1200

1201
    de_rate = 0.1
1✔
1202
    de_stepsize = 0.5
1✔
1203

1204
    um_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1205

1206
    spx_nparents = 10
1✔
1207
    spx_noffspring = 2
1✔
1208
    spx_expansion = 0.3
1✔
1209

1210
    pcx_nparents = 10
1✔
1211
    pcx_noffspring = 2
1✔
1212
    pcx_eta = 0.1
1✔
1213
    pcx_zeta = 0.1
1✔
1214

1215
    undx_nparents = 10
1✔
1216
    undx_noffspring = 2
1✔
1217
    undx_zeta = 0.5
1✔
1218
    undx_eta = 0.35
1✔
1219

1220
    def __init__(
1✔
1221
        self,
1222
        problem,
1223
        epsilons,
1224
        population_size=100,
1225
        generator=RandomGenerator(),  # noqa: B008
1226
        selector=TournamentSelector(2),  # noqa: B008
1227
        variator=None,
1228
        **kwargs,
1229
    ):
1230
        """Init."""
1231
        self.problem = problem
×
1232

1233
        # Parameterization taken from
1234
        # Borg: An Auto-Adaptive MOEA Framework - Hadka, Reed
1235
        variators = [
×
1236
            GAOperator(
1237
                SBX(probability=self.sbx_prop, distribution_index=self.sbx_dist),
1238
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1239
            ),
1240
            GAOperator(
1241
                PCX(
1242
                    nparents=self.pcx_nparents,
1243
                    noffspring=self.pcx_noffspring,
1244
                    eta=self.pcx_eta,
1245
                    zeta=self.pcx_zeta,
1246
                ),
1247
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1248
            ),
1249
            GAOperator(
1250
                DifferentialEvolution(
1251
                    crossover_rate=self.de_rate, step_size=self.de_stepsize
1252
                ),
1253
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1254
            ),
1255
            GAOperator(
1256
                UNDX(
1257
                    nparents=self.undx_nparents,
1258
                    noffspring=self.undx_noffspring,
1259
                    zeta=self.undx_zeta,
1260
                    eta=self.undx_eta,
1261
                ),
1262
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1263
            ),
1264
            GAOperator(
1265
                SPX(
1266
                    nparents=self.spx_nparents,
1267
                    noffspring=self.spx_noffspring,
1268
                    expansion=self.spx_expansion,
1269
                ),
1270
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1271
            ),
1272
            UM(probability=self.um_p),
1273
        ]
1274

1275
        variator = Multimethod(self, variators)
×
1276

1277
        super().__init__(
×
1278
            NSGAII(
1279
                problem,
1280
                population_size,
1281
                generator,
1282
                selector,
1283
                variator,
1284
                EpsilonBoxArchive(epsilons),
1285
                **kwargs,
1286
            )
1287
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc