• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 18214982978

03 Oct 2025 06:39AM UTC coverage: 88.703% (+0.04%) from 88.664%
18214982978

Pull #422

github

web-flow
Merge fe026872f into 592d0cd98
Pull Request #422: ruff fixes

53 of 73 new or added lines in 16 files covered. (72.6%)

2 existing lines in 2 files now uncovered.

7852 of 8852 relevant lines covered (88.7%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.01
/ema_workbench/em_framework/optimization.py
1
"""Wrapper around platypus-opt."""
2

3
import abc
1✔
4
import copy
1✔
5
import functools
1✔
6
import os
1✔
7
import random
1✔
8
import shutil
1✔
9
import tarfile
1✔
10
import warnings
1✔
11
from collections.abc import Callable, Iterable
1✔
12

13
import numpy as np
1✔
14
import pandas as pd
1✔
15

16
from ..util import INFO, EMAError, get_module_logger, temporary_filter
1✔
17
from . import callbacks, evaluators
1✔
18
from .model import AbstractModel
1✔
19
from .outcomes import AbstractOutcome
1✔
20
from .parameters import (
1✔
21
    BooleanParameter,
22
    CategoricalParameter,
23
    IntegerParameter,
24
    RealParameter,
25
)
26
from .points import Sample
1✔
27
from .util import ProgressTrackingMixIn, determine_objects
1✔
28

29
try:
1✔
30
    import platypus
1✔
31
    from platypus import (
1✔
32
        NSGAII,
33
        PCX,
34
        PM,
35
        SBX,
36
        SPX,
37
        UM,
38
        UNDX,
39
        Algorithm,
40
        DifferentialEvolution,
41
        EpsilonBoxArchive,
42
        EpsilonIndicator,
43
        EpsilonProgressContinuation,
44
        EpsNSGAII,
45
        GAOperator,
46
        GenerationalDistance,
47
        Hypervolume,
48
        Integer,
49
        InvertedGenerationalDistance,
50
        Multimethod,
51
        RandomGenerator,
52
        Real,
53
        Solution,
54
        Spacing,
55
        Subset,
56
        TournamentSelector,
57
        Variator,
58
    )  # @UnresolvedImport
59
    from platypus import Problem as PlatypusProblem
1✔
60

61

62
except ImportError:
×
63
    warnings.warn(
×
64
        "Platypus based optimization not available. Install with `pip install platypus-opt`",
65
        ImportWarning,
66
        stacklevel=2,
67
    )
68

69
    class PlatypusProblem:
×
70
        constraints = []
×
71

72
        def __init__(self, *args, **kwargs):
×
73
            pass
×
74

75
    class Variator:
×
76
        def __init__(self, *args, **kwargs):
×
77
            pass
×
78

79
    class RandomGenerator:
×
80
        def __call__(self, *args, **kwargs):
×
81
            pass
×
82

83
    class TournamentSelector:
×
84
        def __init__(self, *args, **kwargs):
×
85
            pass
×
86

87
        def __call__(self, *args, **kwargs):
×
88
            pass
×
89

90
    class EpsilonProgressContinuation:
×
91
        pass
×
92

93
    EpsNSGAII = None
×
94
    platypus = None
×
95
    Real = Integer = Subset = None
×
96

97
# Created on 5 Jun 2017
98
#
99
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
100

101
__all__ = [
1✔
102
    "ArchiveLogger",
103
    "Convergence",
104
    "EpsilonIndicatorMetric",
105
    "EpsilonProgress",
106
    "GenerationalDistanceMetric",
107
    "HypervolumeMetric",
108
    "InvertedGenerationalDistanceMetric",
109
    "OperatorProbabilities",
110
    "Problem",
111
    "RobustProblem",
112
    "SpacingMetric",
113
    "epsilon_nondominated",
114
    "rebuild_platypus_population",
115
    "to_problem",
116
    "to_robust_problem",
117
]
118
_logger = get_module_logger(__name__)
1✔
119

120

121
class Problem(PlatypusProblem):
1✔
122
    """Small extension to Platypus problem object.
123

124
    Includes information on the names of the decision variables, the names of the outcomes,
125
    and the type of search.
126
    """
127

128
    @property
1✔
129
    def parameter_names(self):
1✔
130
        """Getter for parameter names."""
131
        return [e.name for e in self.parameters]
×
132

133
    def __init__(
1✔
134
        self, searchover, parameters, outcome_names, constraints, reference=None
135
    ):
136
        """Init."""
137
        if constraints is None:
1✔
138
            constraints = []
1✔
139

140
        super().__init__(len(parameters), len(outcome_names), nconstrs=len(constraints))
1✔
141
        #         assert len(parameters) == len(parameter_names)
142
        assert searchover in ("levers", "uncertainties", "robust")
1✔
143

144
        if searchover == "levers" or searchover == "uncertainties":
1✔
145
            assert not reference or isinstance(reference, Sample)
1✔
146
        else:
147
            assert not reference
1✔
148

149
        self.searchover = searchover
1✔
150
        self.parameters = parameters
1✔
151
        self.outcome_names = outcome_names
1✔
152
        self.ema_constraints = constraints
1✔
153
        self.constraint_names = [c.name for c in constraints]
1✔
154
        self.reference = reference if reference else 0
1✔
155

156

157
class RobustProblem(Problem):
1✔
158
    """Small extension to Problem object for robust optimization.
159

160
    adds the scenarios and the robustness functions
161
    """
162

163
    def __init__(
1✔
164
        self, parameters, outcome_names, scenarios, robustness_functions, constraints
165
    ):
166
        """Init."""
167
        super().__init__("robust", parameters, outcome_names, constraints)
1✔
168
        assert len(robustness_functions) == len(outcome_names)
1✔
169
        self.scenarios = scenarios
1✔
170
        self.robustness_functions = robustness_functions
1✔
171

172

173
def to_problem(
1✔
174
    model: AbstractModel,
175
    searchover: str,
176
    reference: Sample | None = None,
177
    constraints=None,
178
):
179
    """Helper function to create Problem object.
180

181
    Parameters
182
    ----------
183
    model : AbstractModel instance
184
    searchover : str
185
    reference : Sample instance, optional
186
                overwrite the default scenario in case of searching over
187
                levers, or default policy in case of searching over
188
                uncertainties
189
    constraints : list, optional
190

191
    Returns
192
    -------
193
    Problem instance
194

195
    """
196
    # extract the levers and the outcomes
197
    decision_variables = determine_objects(model, searchover, union=True)
1✔
198

199
    outcomes = determine_objects(model, "outcomes")
1✔
200
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
201
    outcome_names = [outcome.name for outcome in outcomes]
1✔
202

203
    if not outcomes:
1✔
204
        raise EMAError(
×
205
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
206
        )
207

208
    problem = Problem(
1✔
209
        searchover, decision_variables, outcome_names, constraints, reference=reference
210
    )
211
    problem.types[:] = to_platypus_types(decision_variables)
1✔
212
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
213
    problem.constraints[:] = "==0"
1✔
214

215
    return problem
1✔
216

217

218
def to_robust_problem(model, scenarios, robustness_functions, constraints=None):
1✔
219
    """Helper function to create RobustProblem object.
220

221
    Parameters
222
    ----------
223
    model : AbstractModel instance
224
    scenarios : collection
225
    robustness_functions : iterable of ScalarOutcomes
226
    constraints : list, optional
227

228
    Returns
229
    -------
230
    RobustProblem instance
231

232
    """
233
    # extract the levers and the outcomes
234
    decision_variables = determine_objects(model, "levers", union=True)
1✔
235

236
    outcomes = robustness_functions
1✔
237
    outcomes = [outcome for outcome in outcomes if outcome.kind != AbstractOutcome.INFO]
1✔
238
    outcome_names = [outcome.name for outcome in outcomes]
1✔
239

240
    if not outcomes:
1✔
241
        raise EMAError(
×
242
            "No outcomes specified to optimize over, all outcomes are of kind=INFO"
243
        )
244

245
    problem = RobustProblem(
1✔
246
        decision_variables, outcome_names, scenarios, robustness_functions, constraints
247
    )
248

249
    problem.types[:] = to_platypus_types(decision_variables)
1✔
250
    problem.directions[:] = [outcome.kind for outcome in outcomes]
1✔
251
    problem.constraints[:] = "==0"
1✔
252

253
    return problem
1✔
254

255

256
def to_platypus_types(decision_variables):
1✔
257
    """Helper function for mapping from workbench parameter types to platypus parameter types."""
258
    # TODO:: should categorical not be platypus.Subset, with size == 1?
259
    _type_mapping = {
1✔
260
        RealParameter: platypus.Real,
261
        IntegerParameter: platypus.Integer,
262
        CategoricalParameter: platypus.Subset,
263
        BooleanParameter: platypus.Subset,
264
    }
265

266
    types = []
1✔
267
    for dv in decision_variables:
1✔
268
        klass = _type_mapping[type(dv)]
1✔
269

270
        if not isinstance(dv, (CategoricalParameter | BooleanParameter)):
1✔
271
            decision_variable = klass(dv.lower_bound, dv.upper_bound)
1✔
272
        else:
273
            decision_variable = klass(dv.categories, 1)
1✔
274

275
        types.append(decision_variable)
1✔
276
    return types
1✔
277

278

279
def to_dataframe(solutions, dvnames, outcome_names):
1✔
280
    """Helper function to turn a collection of platypus Solution instances into a pandas DataFrame.
281

282
    Parameters
283
    ----------
284
    solutions : collection of Solution instances
285
    dvnames : list of str
286
    outcome_names : list of str
287

288
    Returns
289
    -------
290
    pandas DataFrame
291
    """
292
    results = []
1✔
293
    for solution in platypus.unique(solutions):
1✔
294
        vars = transform_variables(
1✔
295
            solution.problem, solution.variables
296
        )  # @ReservedAssignment
297

298
        decision_vars = dict(zip(dvnames, vars))
1✔
299
        decision_out = dict(zip(outcome_names, solution.objectives))
1✔
300

301
        result = decision_vars.copy()
1✔
302
        result.update(decision_out)
1✔
303

304
        results.append(result)
1✔
305

306
    results = pd.DataFrame(results, columns=dvnames + outcome_names)
1✔
307
    return results
1✔
308

309

310
def process_uncertainties(jobs):
1✔
311
    """Helper function to map jobs generated by platypus to Scenario objects.
312

313
    Parameters
314
    ----------
315
    jobs : collection
316

317
    Returns
318
    -------
319
    scenarios, policies
320

321
    """
322
    problem = jobs[0].solution.problem
×
323
    scenarios = []
×
324

325
    jobs = _process(jobs, problem)
×
326
    for i, job in enumerate(jobs):
×
327
        name = str(i)
×
328
        scenario = Sample(name=name, **job)
×
329
        scenarios.append(scenario)
×
330

331
    policies = problem.reference
×
332

333
    return scenarios, policies
×
334

335

336
def process_levers(jobs):
1✔
337
    """Helper function to map jobs generated by platypus to Sample instances.
338

339
    Parameters
340
    ----------
341
    jobs : collection
342

343
    Returns
344
    -------
345
    scenarios, policies
346

347
    """
348
    problem = jobs[0].solution.problem
×
349
    policies = []
×
350
    jobs = _process(jobs, problem)
×
351
    for i, job in enumerate(jobs):
×
352
        name = str(i)
×
353
        policy = Sample(name=name, **job)
×
354
        policies.append(policy)
×
355

356
    scenarios = problem.reference
×
357

358
    return scenarios, policies
×
359

360

361
def _process(jobs, problem):
1✔
362
    """Helper function to transform platypus job to dict with correct values for workbench."""
363
    processed_jobs = []
×
364
    for job in jobs:
×
365
        variables = transform_variables(problem, job.solution.variables)
×
366
        processed_job = {}
×
367
        for param, var in zip(problem.parameters, variables):
×
368
            try:
×
369
                var = var.value  # noqa: PLW2901
×
370
            except AttributeError:
×
371
                pass
×
372
            processed_job[param.name] = var
×
373
        processed_jobs.append(processed_job)
×
374
    return processed_jobs
×
375

376

377
def process_robust(jobs):
1✔
378
    """Helper function to process robust optimization jobs.
379

380
    Parameters
381
    ----------
382
    jobs : collection
383

384
    Returns
385
    -------
386
    scenarios, policies
387

388
    """
389
    _, policies = process_levers(jobs)
×
390
    scenarios = jobs[0].solution.problem.scenarios
×
391

392
    return scenarios, policies
×
393

394

395
def transform_variables(problem, variables):
1✔
396
    """Helper function for transforming platypus variables."""
397
    converted_vars = []
1✔
398
    for type, var in zip(problem.types, variables):  # @ReservedAssignment
1✔
399
        var = type.decode(var)  # noqa: PLW2901
1✔
400
        try:
1✔
401
            var = var[0]  # noqa: PLW2901
1✔
402
        except TypeError:
1✔
403
            pass
1✔
404

405
        converted_vars.append(var)
1✔
406
    return converted_vars
1✔
407

408

409
def evaluate(jobs_collection, experiments, outcomes, problem):
1✔
410
    """Helper function for mapping the results from perform_experiments back to what platypus needs."""
411
    searchover = problem.searchover
×
412
    outcome_names = problem.outcome_names
×
413
    constraints = problem.ema_constraints
×
414

415
    column = "policy" if searchover == "levers" else "scenario"
×
416

417
    for entry, job in jobs_collection:
×
418
        logical = experiments[column] == entry.name
×
419

420
        job_outputs = {}
×
421
        for k, v in outcomes.items():
×
422
            job_outputs[k] = v[logical][0]
×
423

424
        # TODO:: only retain uncertainties
425
        job_experiment = experiments[logical]
×
426
        job_constraints = _evaluate_constraints(
×
427
            job_experiment, job_outputs, constraints
428
        )
429
        job_outcomes = [job_outputs[key] for key in outcome_names]
×
430

431
        if job_constraints:
×
NEW
432
            job.solution.problem.function = (
×
433
                lambda _, job_outcomes=job_outcomes, job_constraints=job_constraints: (
434
                    job_outcomes,
435
                    job_constraints,
436
                )
437
            )
438
        else:
NEW
439
            job.solution.problem.function = (
×
440
                lambda _, job_outcomes=job_outcomes: job_outcomes
441
            )
UNCOV
442
        job.solution.evaluate()
×
443

444

445
def evaluate_robust(jobs_collection, experiments, outcomes, problem):
1✔
446
    """Helper function for mapping the results from perform_experiments back to what Platypus needs."""
447
    robustness_functions = problem.robustness_functions
×
448
    constraints = problem.ema_constraints
×
449

450
    for entry, job in jobs_collection:
×
451
        logical = experiments["policy"] == entry.name
×
452

453
        job_outcomes_dict = {}
×
454
        job_outcomes = []
×
455
        for rf in robustness_functions:
×
456
            data = [outcomes[var_name][logical] for var_name in rf.variable_name]
×
457
            score = rf.function(*data)
×
458
            job_outcomes_dict[rf.name] = score
×
459
            job_outcomes.append(score)
×
460

461
        # TODO:: only retain levers
462
        job_experiment = experiments[logical].iloc[0]
×
463
        job_constraints = _evaluate_constraints(
×
464
            job_experiment, job_outcomes_dict, constraints
465
        )
466

467
        if job_constraints:
×
NEW
468
            job.solution.problem.function = (
×
469
                lambda _, job_outcomes=job_outcomes, job_constraints=job_constraints: (
470
                    job_outcomes,
471
                    job_constraints,
472
                )
473
            )
474
        else:
NEW
475
            job.solution.problem.function = (
×
476
                lambda _, job_outcomes=job_outcomes: job_outcomes
477
            )
478

479
        job.solution.evaluate()
×
480

481

482
def _evaluate_constraints(job_experiment, job_outcomes, constraints):
1✔
483
    """Helper function for evaluating the constraints for a given job."""
484
    job_constraints = []
×
485
    for constraint in constraints:
×
486
        data = [job_experiment[var] for var in constraint.parameter_names]
×
487
        data += [job_outcomes[var] for var in constraint.outcome_names]
×
488
        constraint_value = constraint.process(data)
×
489
        job_constraints.append(constraint_value)
×
490
    return job_constraints
×
491

492

493
class AbstractConvergenceMetric(abc.ABC):
1✔
494
    """Base convergence metric class."""
495

496
    def __init__(self, name):
1✔
497
        """Init."""
498
        super().__init__()
×
499
        self.name = name
×
500
        self.results = []
×
501

502
    @abc.abstractmethod
1✔
503
    def __call__(self, optimizer):
1✔
504
        """Call the convergence metric."""
505

506
    def reset(self):
1✔
507
        self.results = []
×
508

509
    def get_results(self):
1✔
510
        return self.results
×
511

512

513
class EpsilonProgress(AbstractConvergenceMetric):
1✔
514
    """Epsilon progress convergence metric class."""
515

516
    def __init__(self):
1✔
517
        """Init."""
518
        super().__init__("epsilon_progress")
×
519

520
    def __call__(self, optimizer):  # noqa: D102
1✔
521
        self.results.append(optimizer.archive.improvements)
×
522

523

524
class MetricWrapper:
1✔
525
    """Wrapper class for wrapping platypus indicators.
526

527
    Parameters
528
    ----------
529
    reference_set : DataFrame
530
    problem : PlatypusProblem instance
531
    kwargs : dict
532
             any additional keyword arguments to be passed
533
             on to the wrapper platypus indicator class
534

535
    Notes
536
    -----
537
    this class relies on multi-inheritance and careful consideration
538
    of the MRO to conveniently wrap the convergence metrics provided
539
    by platypus.
540

541
    """
542

543
    def __init__(self, reference_set, problem, **kwargs):
1✔
544
        self.problem = problem
×
545
        reference_set = rebuild_platypus_population(reference_set, self.problem)
×
546
        super().__init__(reference_set=reference_set, **kwargs)
×
547

548
    def calculate(self, archive):
1✔
549
        solutions = rebuild_platypus_population(archive, self.problem)
×
550
        return super().calculate(solutions)
×
551

552

553
class HypervolumeMetric(MetricWrapper, Hypervolume):
1✔
554
    """Hypervolume metric.
555

556
    Parameters
557
    ----------
558
    reference_set : DataFrame
559
    problem : PlatypusProblem instance
560

561
    this is a thin wrapper around Hypervolume as provided
562
    by platypus to make it easier to use in conjunction with the
563
    workbench.
564

565
    """
566

567

568
class GenerationalDistanceMetric(MetricWrapper, GenerationalDistance):
1✔
569
    """GenerationalDistance metric.
570

571
    Parameters
572
    ----------
573
    reference_set : DataFrame
574
    problem : PlatypusProblem instance
575
    d : int, default=1
576
        the power in the intergenerational distance function
577

578

579
    This is a thin wrapper around GenerationalDistance as provided
580
    by platypus to make it easier to use in conjunction with the
581
    workbench.
582

583
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
584
    for more information
585

586
    """
587

588

589
class InvertedGenerationalDistanceMetric(MetricWrapper, InvertedGenerationalDistance):
1✔
590
    """InvertedGenerationalDistance metric.
591

592
    Parameters
593
    ----------
594
    reference_set : DataFrame
595
    problem : PlatypusProblem instance
596
    d : int, default=1
597
        the power in the inverted intergenerational distance function
598

599

600
    This is a thin wrapper around InvertedGenerationalDistance as provided
601
    by platypus to make it easier to use in conjunction with the
602
    workbench.
603

604
    see https://link.springer.com/content/pdf/10.1007/978-3-319-15892-1_8.pdf
605
    for more information
606

607
    """
608

609

610
class EpsilonIndicatorMetric(MetricWrapper, EpsilonIndicator):
1✔
611
    """EpsilonIndicator metric.
612

613
    Parameters
614
    ----------
615
    reference_set : DataFrame
616
    problem : PlatypusProblem instance
617

618

619
    this is a thin wrapper around EpsilonIndicator as provided
620
    by platypus to make it easier to use in conjunction with the
621
    workbench.
622

623
    """
624

625

626
class SpacingMetric(MetricWrapper, Spacing):
1✔
627
    """Spacing metric.
628

629
    Parameters
630
    ----------
631
    problem : PlatypusProblem instance
632

633

634
    this is a thin wrapper around Spacing as provided
635
    by platypus to make it easier to use in conjunction with the
636
    workbench.
637

638
    """
639

640
    def __init__(self, problem):
1✔
641
        self.problem = problem
×
642

643

644
class HyperVolume(AbstractConvergenceMetric):
1✔
645
    """Hypervolume convergence metric class.
646

647
    This metric is derived from a hyper-volume measure, which describes the
648
    multi-dimensional volume of space contained within the pareto front. When
649
    computed with minimum and maximums, it describes the ratio of dominated
650
    outcomes to all possible outcomes in the extent of the space.  Getting this
651
    number to be high or low is not necessarily important, as not all outcomes
652
    within the min-max range will be feasible.  But, having the hypervolume remain
653
    fairly stable over multiple generations of the evolutionary algorithm provides
654
    an indicator of convergence.
655

656
    Parameters
657
    ----------
658
    minimum : numpy array
659
    maximum : numpy array
660

661

662
    This class is deprecated and will be removed in version 3.0 of the EMAworkbench.
663
    Use ArchiveLogger instead and calculate hypervolume in post using HypervolumeMetric
664
    as also shown in the directed search tutorial.
665

666
    """
667

668
    def __init__(self, minimum, maximum):
1✔
669
        super().__init__("hypervolume")
×
670
        warnings.warn(
×
671
            "HyperVolume is deprecated and will be removed in version 3.0 of the EMAworkbench."
672
            "Use ArchiveLogger and HypervolumeMetric instead",
673
            DeprecationWarning,
674
            stacklevel=2,
675
        )
676
        self.hypervolume_func = Hypervolume(minimum=minimum, maximum=maximum)
×
677

678
    def __call__(self, optimizer):
1✔
679
        self.results.append(self.hypervolume_func.calculate(optimizer.archive))
×
680

681
    @classmethod
1✔
682
    def from_outcomes(cls, outcomes):
1✔
683
        ranges = [o.expected_range for o in outcomes if o.kind != o.INFO]
×
684
        minimum, maximum = np.asarray(list(zip(*ranges)))
×
685
        return cls(minimum, maximum)
×
686

687

688
class ArchiveLogger(AbstractConvergenceMetric):
1✔
689
    """Helper class to write the archive to disk at each iteration.
690

691
    Parameters
692
    ----------
693
    directory : str
694
    decision_varnames : list of str
695
    outcome_varnames : list of str
696
    base_filename : str, optional
697
    """
698

699
    def __init__(
1✔
700
        self,
701
        directory,
702
        decision_varnames,
703
        outcome_varnames,
704
        base_filename="archives.tar.gz",
705
    ):
706
        """Init."""
707
        super().__init__("archive_logger")
×
708

709
        self.directory = os.path.abspath(directory)
×
710
        self.temp = os.path.join(self.directory, "tmp")
×
711
        os.makedirs(self.temp, exist_ok=True)
×
712

713
        self.base = base_filename
×
714
        self.decision_varnames = decision_varnames
×
715
        self.outcome_varnames = outcome_varnames
×
716
        self.tarfilename = os.path.join(self.directory, base_filename)
×
717

718
        # self.index = 0
719

720
    def __call__(self, optimizer):  # noqa: D102
1✔
721
        archive = to_dataframe(
×
722
            optimizer.result, self.decision_varnames, self.outcome_varnames
723
        )
724
        archive.to_csv(os.path.join(self.temp, f"{optimizer.nfe}.csv"), index=False)
×
725

726
    def reset(self):  # noqa: D102
1✔
727
        # FIXME what needs to go here?
728
        pass
×
729

730
    def get_results(self):  # noqa: D102
1✔
731
        with tarfile.open(self.tarfilename, "w:gz") as z:
×
732
            z.add(self.temp, arcname=os.path.basename(self.temp))
×
733

734
        shutil.rmtree(self.temp)
×
735
        return None
×
736

737
    @classmethod
1✔
738
    def load_archives(cls, filename):
1✔
739
        """Load the archives stored with the ArchiveLogger.
740

741
        Parameters
742
        ----------
743
        filename : str
744
                   relative path to file
745

746
        Returns
747
        -------
748
        dict with nfe as key and dataframe as vlaue
749
        """
750
        archives = {}
×
751
        with tarfile.open(os.path.abspath(filename)) as fh:
×
752
            for entry in fh.getmembers():
×
753
                if entry.name.endswith("csv"):
×
754
                    key = entry.name.split("/")[1][:-4]
×
755
                    archives[int(key)] = pd.read_csv(fh.extractfile(entry))
×
756
        return archives
×
757

758

759
class OperatorProbabilities(AbstractConvergenceMetric):
1✔
760
    """OperatorProbabiliy convergence tracker for use with auto adaptive operator selection.
761

762
    Parameters
763
    ----------
764
    name : str
765
    index : int
766

767

768
    State of the art MOEAs like Borg (and GenerationalBorg provided by the workbench)
769
    use autoadaptive operator selection. The algorithm has multiple different evolutionary
770
    operators. Over the run, it tracks how well each operator is doing in producing fitter
771
    offspring. The probability of the algorithm using a given evolutionary operator is
772
    proportional to how well this operator has been doing in producing fitter offspring in
773
    recent generations. This class can be used to track these probabilities over the
774
    run of the algorithm.
775

776
    """
777

778
    def __init__(self, name, index):
1✔
779
        super().__init__(name)
×
780
        self.index = index
×
781

782
    def __call__(self, optimizer):  # noqa: D102
1✔
783
        try:
×
784
            props = optimizer.algorithm.variator.probabilities
×
785
            self.results.append(props[self.index])
×
786
        except AttributeError:
×
787
            pass
×
788

789

790
def epsilon_nondominated(results, epsilons, problem):
1✔
791
    """Merge the list of results into a single set of non dominated results using the provided epsilon values.
792

793
    Parameters
794
    ----------
795
    results : list of DataFrames
796
    epsilons : epsilon values for each objective
797
    problem : PlatypusProblem instance
798

799
    Returns
800
    -------
801
    DataFrame
802

803
    Notes
804
    -----
805
    this is a platypus based alternative to pareto.py (https://github.com/matthewjwoodruff/pareto.py)
806
    """
807
    if problem.nobjs != len(epsilons):
×
808
        raise ValueError(
×
809
            f"The number of epsilon values ({len(epsilons)}) must match the number of objectives {problem.nobjs}"
810
        )
811

812
    results = pd.concat(results, ignore_index=True)
×
813
    solutions = rebuild_platypus_population(results, problem)
×
814
    archive = EpsilonBoxArchive(epsilons)
×
815
    archive += solutions
×
816

817
    return to_dataframe(archive, problem.parameter_names, problem.outcome_names)
×
818

819

820
class Convergence(ProgressTrackingMixIn):
1✔
821
    """helper class for tracking convergence of optimization."""
822

823
    valid_metrics = {"hypervolume", "epsilon_progress", "archive_logger"}
1✔
824

825
    def __init__(
1✔
826
        self,
827
        metrics,
828
        max_nfe,
829
        convergence_freq=1000,
830
        logging_freq=5,
831
        log_progress=False,
832
    ):
833
        """Init."""
834
        super().__init__(
×
835
            max_nfe,
836
            logging_freq,
837
            _logger,
838
            log_progress=log_progress,
839
            log_func=lambda self: f"generation"
840
            f" {self.generation}, {self.i}/{self.max_nfe}",
841
        )
842

843
        self.max_nfe = max_nfe
×
844
        self.generation = -1
×
845
        self.index = []
×
846
        self.last_check = 0
×
847

848
        if metrics is None:
×
849
            metrics = []
×
850

851
        self.metrics = metrics
×
852
        self.convergence_freq = convergence_freq
×
853
        self.logging_freq = logging_freq
×
854

855
        # TODO what is the point of this code?
856
        for metric in metrics:
×
857
            assert isinstance(metric, AbstractConvergenceMetric)
×
858
            metric.reset()
×
859

860
    def __call__(self, optimizer, force=False):
1✔
861
        """Stores convergences information given specified convergence frequency.
862

863
        Parameters
864
        ----------
865
        optimizer : platypus optimizer instance
866
        force : boolean, optional
867
                if True, convergence information will always be stored
868
                if False, converge information will be stored if the
869
                the number of nfe since the last time of storing is equal to
870
                or higher then convergence_freq
871

872

873
        the primary use case for force is to force convergence frequency information
874
        to be stored once the stopping condition of the optimizer has been reached
875
        so that the final convergence information is kept.
876

877
        """
878
        nfe = optimizer.nfe
×
879
        super().__call__(nfe - self.i)
×
880

881
        self.generation += 1
×
882

883
        if (
×
884
            (nfe >= self.last_check + self.convergence_freq)
885
            or (self.last_check == 0)
886
            or force
887
        ):
888
            self.index.append(nfe)
×
889
            self.last_check = nfe
×
890

891
            for metric in self.metrics:
×
892
                metric(optimizer)
×
893

894
    def to_dataframe(self):  # noqa: D102
1✔
895
        progress = {
×
896
            metric.name: result
897
            for metric in self.metrics
898
            if (result := metric.get_results())
899
        }
900

901
        progress = pd.DataFrame.from_dict(progress)
×
902

903
        if not progress.empty:
×
904
            progress["nfe"] = self.index
×
905

906
        return progress
×
907

908

909
def rebuild_platypus_population(archive, problem):
1✔
910
    """Rebuild a population of platypus Solution instances.
911

912
    Parameters
913
    ----------
914
    archive : DataFrame
915
    problem : PlatypusProblem instance
916

917
    Returns
918
    -------
919
    list of platypus Solutions
920

921
    """
922
    expected_columns = problem.nvars + problem.nobjs
×
923
    actual_columns = len(archive.columns)
×
924

925
    if actual_columns != expected_columns:
×
926
        raise EMAError(
×
927
            f"The number of columns in the archive ({actual_columns}) does not match the "
928
            f"expected number of decision variables and objectives ({expected_columns})."
929
        )
930

931
    solutions = []
×
932
    for row in archive.itertuples():
×
933
        try:
×
934
            decision_variables = [
×
935
                getattr(row, attr) for attr in problem.parameter_names
936
            ]
937
        except AttributeError as e:
×
938
            missing_parameters = [
×
939
                attr for attr in problem.parameter_names if not hasattr(row, attr)
940
            ]
941
            raise EMAError(
×
942
                f"Parameter names {missing_parameters} not found in archive"
943
            ) from e
944

945
        try:
×
946
            objectives = [getattr(row, attr) for attr in problem.outcome_names]
×
947
        except AttributeError as e:
×
948
            missing_outcomes = [
×
949
                attr for attr in problem.outcome_names if not hasattr(row, attr)
950
            ]
951
            raise EMAError(
×
952
                f"Outcome names {missing_outcomes} not found in archive'"
953
            ) from e
954

955
        solution = Solution(problem)
×
956
        solution.variables[:] = [
×
957
            platypus_type.encode(value)
958
            for platypus_type, value in zip(problem.types, decision_variables)
959
        ]
960
        solution.objectives[:] = objectives
×
961
        solutions.append(solution)
×
962
    return solutions
×
963

964

965
class CombinedVariator(Variator):
1✔
966
    """Combined variator."""
967

968
    def __init__(self, crossover_prob=0.5, mutation_prob=1):
1✔
969
        super().__init__(2)
×
970
        self.SBX = platypus.SBX()
×
971
        self.crossover_prob = crossover_prob
×
972
        self.mutation_prob = mutation_prob
×
973

974
    def evolve(self, parents):
1✔
975
        child1 = copy.deepcopy(parents[0])
×
976
        child2 = copy.deepcopy(parents[1])
×
977
        problem = child1.problem
×
978

979
        # crossover
980
        # we will evolve the individual
981
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
982
            if random.random() <= self.crossover_prob:
×
983
                klass = kind.__class__
×
984
                child1, child2 = self._crossover[klass](self, child1, child2, i, kind)
×
985
                child1.evaluated = False
×
986
                child2.evaluated = False
×
987

988
        # mutate
989
        for child in [child1, child2]:
×
990
            self.mutate(child)
×
991

992
        return [child1, child2]
×
993

994
    def mutate(self, child):
1✔
995
        problem = child.problem
×
996

997
        for i, kind in enumerate(problem.types):  # @ReservedAssignment
×
998
            if random.random() <= self.mutation_prob:
×
999
                klass = kind.__class__
×
1000
                child = self._mutate[klass](self, child, i, kind)
×
1001
                child.evaluated = False
×
1002

1003
    def crossover_real(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1004
        # sbx
1005
        x1 = float(child1.variables[i])
×
1006
        x2 = float(child2.variables[i])
×
1007
        lb = type.min_value
×
1008
        ub = type.max_value
×
1009

1010
        x1, x2 = self.SBX.sbx_crossover(x1, x2, lb, ub)
×
1011

1012
        child1.variables[i] = x1
×
1013
        child2.variables[i] = x2
×
1014

1015
        return child1, child2
×
1016

1017
    def crossover_integer(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1018
        # HUX()
1019
        for j in range(type.nbits):
×
1020
            if child1.variables[i][j] != child2.variables[i][j]:  # noqa: SIM102
×
1021
                if bool(random.getrandbits(1)):
×
1022
                    child1.variables[i][j] = not child1.variables[i][j]
×
1023
                    child2.variables[i][j] = not child2.variables[i][j]
×
1024
        return child1, child2
×
1025

1026
    def crossover_categorical(self, child1, child2, i, type):  # @ReservedAssignment
1✔
1027
        # SSX()
1028
        # can probably be implemented in a simple manner, since size
1029
        # of subset is fixed to 1
1030

1031
        s1 = set(child1.variables[i])
×
1032
        s2 = set(child2.variables[i])
×
1033

1034
        for j in range(type.size):
×
1035
            if (
×
1036
                (child2.variables[i][j] not in s1)
1037
                and (child1.variables[i][j] not in s2)
1038
                and (random.random() < 0.5)
1039
            ):
1040
                temp = child1.variables[i][j]
×
1041
                child1.variables[i][j] = child2.variables[i][j]
×
1042
                child2.variables[i][j] = temp
×
1043

1044
        return child1, child2
×
1045

1046
    def mutate_real(self, child, i, type, distribution_index=20):  # @ReservedAssignment
1✔
1047
        # PM
1048
        x = child.variables[i]
×
1049
        lower = type.min_value
×
1050
        upper = type.max_value
×
1051

1052
        u = random.random()
×
1053
        dx = upper - lower
×
1054

1055
        if u < 0.5:
×
1056
            bl = (x - lower) / dx
×
1057
            b = 2.0 * u + (1.0 - 2.0 * u) * pow(1.0 - bl, distribution_index + 1.0)
×
1058
            delta = pow(b, 1.0 / (distribution_index + 1.0)) - 1.0
×
1059
        else:
1060
            bu = (upper - x) / dx
×
1061
            b = 2.0 * (1.0 - u) + 2.0 * (u - 0.5) * pow(
×
1062
                1.0 - bu, distribution_index + 1.0
1063
            )
1064
            delta = 1.0 - pow(b, 1.0 / (distribution_index + 1.0))
×
1065

1066
        x = x + delta * dx
×
1067
        x = max(lower, min(x, upper))
×
1068

1069
        child.variables[i] = x
×
1070
        return child
×
1071

1072
    def mutate_integer(self, child, i, type, probability=1):  # @ReservedAssignment
1✔
1073
        # bitflip
1074
        for j in range(type.nbits):
×
1075
            if random.random() <= probability:
×
1076
                child.variables[i][j] = not child.variables[i][j]
×
1077
        return child
×
1078

1079
    def mutate_categorical(self, child, i, type):  # @ReservedAssignment
1✔
1080
        # replace
1081
        probability = 1 / type.size
×
1082

1083
        if random.random() <= probability:
×
1084
            subset = child.variables[i]
×
1085

1086
            if len(subset) < len(type.elements):
×
1087
                j = random.randrange(len(subset))
×
1088

1089
                nonmembers = list(set(type.elements) - set(subset))
×
1090
                k = random.randrange(len(nonmembers))
×
1091
                subset[j] = nonmembers[k]
×
1092

1093
            len(subset)
×
1094

1095
            child.variables[i] = subset
×
1096

1097
        return child
×
1098

1099
    _crossover = {
1✔
1100
        Real: crossover_real,
1101
        Integer: crossover_integer,
1102
        Subset: crossover_categorical,
1103
    }
1104

1105
    _mutate = {
1✔
1106
        Real: mutate_real,
1107
        Integer: mutate_integer,
1108
        Subset: mutate_categorical,
1109
    }
1110

1111

1112
def _optimize(
1✔
1113
    problem: PlatypusProblem,
1114
    evaluator: "BaseEvaluator",  # noqa: F821
1115
    algorithm: type[Algorithm],
1116
    convergence: Iterable[Callable],
1117
    nfe: int,
1118
    convergence_freq: int,
1119
    logging_freq: int,
1120
    variator: Variator = None,
1121
    **kwargs,
1122
):
1123
    """Helper function for optimization."""
1124
    klass = problem.types[0].__class__
×
1125

1126
    try:
×
1127
        eps_values = kwargs["epsilons"]
×
1128
    except KeyError:
×
1129
        pass
×
1130
    else:
1131
        if len(eps_values) != len(problem.outcome_names):
×
1132
            raise EMAError("Number of epsilon values does not match number of outcomes")
×
1133

1134
    if variator is None:
×
1135
        if all(isinstance(t, klass) for t in problem.types):
×
1136
            variator = None
×
1137
        else:
1138
            variator = CombinedVariator()
×
1139
    # mutator = CombinedMutator()
1140

1141
    optimizer = algorithm(
×
1142
        problem, evaluator=evaluator, variator=variator, log_frequency=500, **kwargs
1143
    )
1144
    # optimizer.mutator = mutator
1145

1146
    convergence = Convergence(
×
1147
        convergence, nfe, convergence_freq=convergence_freq, logging_freq=logging_freq
1148
    )
1149
    callback = functools.partial(convergence, optimizer)
×
1150
    evaluator.callback = callback
×
1151

1152
    with temporary_filter(name=[callbacks.__name__, evaluators.__name__], level=INFO):
×
1153
        optimizer.run(nfe)
×
1154

1155
    convergence(optimizer, force=True)
×
1156

1157
    # convergence.pbar.__exit__(None, None, None)
1158

1159
    results = to_dataframe(
×
1160
        optimizer.result, problem.parameter_names, problem.outcome_names
1161
    )
1162
    convergence = convergence.to_dataframe()
×
1163

1164
    message = "optimization completed, found {} solutions"
×
1165
    _logger.info(message.format(len(optimizer.archive)))
×
1166

1167
    if convergence.empty:
×
1168
        return results
×
1169
    else:
1170
        return results, convergence
×
1171

1172

1173
class BORGDefaultDescriptor:
1✔
1174
    """Descriptor used by Borg."""
1175

1176
    # this treats defaults as class level attributes!
1177

1178
    def __init__(self, default_function):
1✔
1179
        self.default_function = default_function
1✔
1180

1181
    def __get__(self, instance, owner):
1✔
1182
        return self.default_function(instance.problem.nvars)
1✔
1183

1184
    def __set_name__(self, owner, name):
1✔
1185
        self.name = name
1✔
1186

1187

1188
class GenerationalBorg(EpsilonProgressContinuation):
1✔
1189
    """A generational implementation of the BORG Framework.
1190

1191
    This algorithm adopts Epsilon Progress Continuation, and Auto Adaptive
1192
    Operator Selection, but embeds them within the NSGAII generational
1193
    algorithm, rather than the steady state implementation used by the BORG
1194
    algorithm.
1195

1196
    The parametrization of all operators is based on the default values as used
1197
    in Borg 1.9.
1198

1199
    Note:: limited to RealParameters only.
1200

1201
    """
1202

1203
    pm_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1204
    pm_dist = 20
1✔
1205

1206
    sbx_prop = 1
1✔
1207
    sbx_dist = 15
1✔
1208

1209
    de_rate = 0.1
1✔
1210
    de_stepsize = 0.5
1✔
1211

1212
    um_p = BORGDefaultDescriptor(lambda x: 1 / x)
1✔
1213

1214
    spx_nparents = 10
1✔
1215
    spx_noffspring = 2
1✔
1216
    spx_expansion = 0.3
1✔
1217

1218
    pcx_nparents = 10
1✔
1219
    pcx_noffspring = 2
1✔
1220
    pcx_eta = 0.1
1✔
1221
    pcx_zeta = 0.1
1✔
1222

1223
    undx_nparents = 10
1✔
1224
    undx_noffspring = 2
1✔
1225
    undx_zeta = 0.5
1✔
1226
    undx_eta = 0.35
1✔
1227

1228
    def __init__(
1✔
1229
        self,
1230
        problem,
1231
        epsilons,
1232
        population_size=100,
1233
        generator=RandomGenerator(),  # noqa: B008
1234
        selector=TournamentSelector(2),  # noqa: B008
1235
        variator=None,
1236
        **kwargs,
1237
    ):
1238
        """Init."""
1239
        self.problem = problem
×
1240

1241
        # Parameterization taken from
1242
        # Borg: An Auto-Adaptive MOEA Framework - Hadka, Reed
1243
        variators = [
×
1244
            GAOperator(
1245
                SBX(probability=self.sbx_prop, distribution_index=self.sbx_dist),
1246
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1247
            ),
1248
            GAOperator(
1249
                PCX(
1250
                    nparents=self.pcx_nparents,
1251
                    noffspring=self.pcx_noffspring,
1252
                    eta=self.pcx_eta,
1253
                    zeta=self.pcx_zeta,
1254
                ),
1255
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1256
            ),
1257
            GAOperator(
1258
                DifferentialEvolution(
1259
                    crossover_rate=self.de_rate, step_size=self.de_stepsize
1260
                ),
1261
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1262
            ),
1263
            GAOperator(
1264
                UNDX(
1265
                    nparents=self.undx_nparents,
1266
                    noffspring=self.undx_noffspring,
1267
                    zeta=self.undx_zeta,
1268
                    eta=self.undx_eta,
1269
                ),
1270
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1271
            ),
1272
            GAOperator(
1273
                SPX(
1274
                    nparents=self.spx_nparents,
1275
                    noffspring=self.spx_noffspring,
1276
                    expansion=self.spx_expansion,
1277
                ),
1278
                PM(probability=self.pm_p, distribution_index=self.pm_dist),
1279
            ),
1280
            UM(probability=self.um_p),
1281
        ]
1282

1283
        variator = Multimethod(self, variators)
×
1284

1285
        super().__init__(
×
1286
            NSGAII(
1287
                problem,
1288
                population_size,
1289
                generator,
1290
                selector,
1291
                variator,
1292
                EpsilonBoxArchive(epsilons),
1293
                **kwargs,
1294
            )
1295
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc