• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 6083775233

05 Sep 2023 11:09AM UTC coverage: 80.638% (-0.3%) from 80.915%
6083775233

Pull #292

github

web-flow
Merge 8abf05c60 into 71e1f9fb3
Pull Request #292: Prototype of MPIEvaluator for multi-node workloads

27 of 27 new or added lines in 1 file covered. (100.0%)

4623 of 5733 relevant lines covered (80.64%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.12
/ema_workbench/em_framework/evaluators.py
1
"""
2
collection of evaluators for performing experiments, optimization, and robust
3
optimization
4

5
"""
6
import enum
1✔
7
import multiprocessing
1✔
8
import numbers
1✔
9
import os
1✔
10
import random
1✔
11
import shutil
1✔
12
import string
1✔
13
import sys
1✔
14
import threading
1✔
15
import warnings
1✔
16

17
from ema_workbench.em_framework.samplers import AbstractSampler
1✔
18
from .callbacks import DefaultCallback
1✔
19
from .points import experiment_generator, Scenario, Policy
1✔
20
from .ema_multiprocessing import LogQueueReader, initializer, add_tasks
1✔
21
from .experiment_runner import ExperimentRunner
1✔
22
from .model import AbstractModel
1✔
23
from .optimization import (
1✔
24
    evaluate_robust,
25
    evaluate,
26
    EpsNSGAII,
27
    to_problem,
28
    to_robust_problem,
29
    process_levers,
30
    process_uncertainties,
31
    process_robust,
32
    _optimize,
33
)
34
from .outcomes import ScalarOutcome, AbstractOutcome
1✔
35
from .salib_samplers import SobolSampler, MorrisSampler, FASTSampler
1✔
36
from .samplers import (
1✔
37
    MonteCarloSampler,
38
    FullFactorialSampler,
39
    LHSSampler,
40
    UniformLHSSampler,
41
    sample_levers,
42
    sample_uncertainties,
43
)
44
from .util import NamedObjectMap, determine_objects
1✔
45
from ..util import EMAError, get_module_logger, ema_logging
1✔
46

47
warnings.simplefilter("once", ImportWarning)
1✔
48

49
try:
1✔
50
    from .ema_ipyparallel import (
1✔
51
        start_logwatcher,
52
        set_engine_logger,
53
        initialize_engines,
54
        cleanup,
55
        _run_experiment,
56
    )
57
except (ImportError, ModuleNotFoundError):
×
58
    warnings.warn("ipyparallel not installed - IpyparalleEvaluator not available")
×
59

60
# Created on 5 Mar 2017
61
#
62
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
63

64
__all__ = [
1✔
65
    "MultiprocessingEvaluator",
66
    "IpyparallelEvaluator",
67
    "optimize",
68
    "perform_experiments",
69
    "SequentialEvaluator",
70
    "Samplers",
71
]
72

73
_logger = get_module_logger(__name__)
1✔
74

75

76
class Samplers(enum.Enum):
1✔
77
    """
78
    Enum for different kinds of samplers
79
    """
80

81
    ## TODO:: have samplers register themselves on class instantiation
82
    ## TODO:: should not be defined here
83

84
    MC = MonteCarloSampler()
1✔
85
    LHS = LHSSampler()
1✔
86
    UNIFORM_LHS = UniformLHSSampler()
1✔
87
    FF = FullFactorialSampler()
1✔
88
    SOBOL = SobolSampler()
1✔
89
    FAST = FASTSampler()
1✔
90
    MORRIS = MorrisSampler()
1✔
91

92

93
class BaseEvaluator:
1✔
94
    """evaluator for experiments using a multiprocessing pool
95

96
    Parameters
97
    ----------
98
    msis : collection of models
99
    searchover : {None, 'levers', 'uncertainties'}, optional
100
                  to be used in combination with platypus
101

102
    Raises
103
    ------
104
    ValueError
105

106
    """
107

108
    reporting_frequency = 3
1✔
109

110
    def __init__(self, msis):
1✔
111
        super().__init__()
1✔
112

113
        if isinstance(msis, AbstractModel):
1✔
114
            msis = [msis]
1✔
115
        else:
116
            for entry in msis:
×
117
                if not isinstance(entry, AbstractModel):
×
118
                    raise TypeError(
×
119
                        f"{entry} should be an AbstractModel "
120
                        f"instance but is a {entry.__class__} instance"
121
                    )
122

123
        self._msis = msis
1✔
124
        self.callback = None
1✔
125

126
    def __enter__(self):
1✔
127
        self.initialize()
1✔
128
        return self
1✔
129

130
    def __exit__(self, exc_type, exc_value, traceback):
1✔
131
        self.finalize()
1✔
132

133
        if exc_type is not None:
1✔
134
            return False
×
135

136
    def initialize(self):
1✔
137
        """initialize the evaluator"""
138
        raise NotImplementedError
×
139

140
    def finalize(self):
1✔
141
        """finalize the evaluator"""
142
        raise NotImplementedError
×
143

144
    def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
1✔
145
        """used by ema_workbench"""
146
        raise NotImplementedError
×
147

148
    def evaluate_all(self, jobs, **kwargs):
1✔
149
        """makes ema_workbench evaluators compatible with Platypus
150
        evaluators as used by platypus algorithms
151
        """
152
        self.callback()
×
153

154
        try:
×
155
            problem = jobs[0].solution.problem
×
156
        except IndexError:
×
157
            # no jobs to evaluate
158
            return jobs
×
159

160
        searchover = problem.searchover
×
161

162
        if searchover == "levers":
×
163
            scenarios, policies = process_levers(jobs)
×
164
            jobs_collection = zip(policies, jobs)
×
165
        elif searchover == "uncertainties":
×
166
            scenarios, policies = process_uncertainties(jobs)
×
167
            jobs_collection = zip(scenarios, jobs)
×
168
        elif searchover == "robust":
×
169
            scenarios, policies = process_robust(jobs)
×
170
            jobs_collection = zip(policies, jobs)
×
171
        else:
172
            raise NotImplementedError()
×
173

174
        # overwrite the default 10 progress reports  with 5 reports
175
        callback = perform_experiments(
×
176
            self._msis,
177
            evaluator=self,
178
            reporting_frequency=self.reporting_frequency,
179
            scenarios=scenarios,
180
            policies=policies,
181
            return_callback=True,
182
            log_progress=True,
183
        )
184

185
        experiments, outcomes = callback.get_results()
×
186

187
        if searchover in ("levers", "uncertainties"):
×
188
            evaluate(jobs_collection, experiments, outcomes, problem)
×
189
        else:
190
            evaluate_robust(jobs_collection, experiments, outcomes, problem)
×
191

192
        return jobs
×
193

194
    def perform_experiments(
1✔
195
        self,
196
        scenarios=0,
197
        policies=0,
198
        reporting_interval=None,
199
        reporting_frequency=10,
200
        uncertainty_union=False,
201
        lever_union=False,
202
        outcome_union=False,
203
        uncertainty_sampling=Samplers.LHS,
204
        lever_sampling=Samplers.LHS,
205
        callback=None,
206
        combine="factorial",
207
    ):
208
        """convenience method for performing experiments.
209

210
        is forwarded to :func:perform_experiments, with evaluator and
211
        models arguments added in.
212

213
        """
214
        return perform_experiments(
×
215
            self._msis,
216
            scenarios=scenarios,
217
            policies=policies,
218
            evaluator=self,
219
            reporting_interval=reporting_interval,
220
            reporting_frequency=reporting_frequency,
221
            uncertainty_union=uncertainty_union,
222
            lever_union=lever_union,
223
            outcome_union=outcome_union,
224
            uncertainty_sampling=uncertainty_sampling,
225
            lever_sampling=lever_sampling,
226
            callback=callback,
227
            combine=combine,
228
        )
229

230
    def optimize(
1✔
231
        self,
232
        algorithm=EpsNSGAII,
233
        nfe=10000,
234
        searchover="levers",
235
        reference=None,
236
        constraints=None,
237
        convergence_freq=1000,
238
        logging_freq=5,
239
        variator=None,
240
        **kwargs,
241
    ):
242
        """convenience method for outcome optimization.
243

244
        is forwarded to :func:optimize, with evaluator and models
245
        arguments added in.
246

247
        """
248
        return optimize(
×
249
            self._msis,
250
            algorithm=algorithm,
251
            nfe=int(nfe),
252
            searchover=searchover,
253
            evaluator=self,
254
            reference=reference,
255
            constraints=constraints,
256
            convergence_freq=convergence_freq,
257
            logging_freq=logging_freq,
258
            variator=variator,
259
            **kwargs,
260
        )
261

262
    def robust_optimize(
1✔
263
        self,
264
        robustness_functions,
265
        scenarios,
266
        algorithm=EpsNSGAII,
267
        nfe=10000,
268
        convergence_freq=1000,
269
        logging_freq=5,
270
        **kwargs,
271
    ):
272
        """convenience method for robust optimization.
273

274
        is forwarded to :func:robust_optimize, with evaluator and models
275
        arguments added in.
276

277
        """
278
        return robust_optimize(
×
279
            self._msis,
280
            robustness_functions,
281
            scenarios,
282
            self,
283
            algorithm=algorithm,
284
            nfe=nfe,
285
            convergence_freq=convergence_freq,
286
            logging_freq=logging_freq,
287
            **kwargs,
288
        )
289

290

291
class SequentialEvaluator(BaseEvaluator):
1✔
292
    def initialize(self):
1✔
293
        pass
×
294

295
    def finalize(self):
1✔
296
        pass
×
297

298
    def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
1✔
299
        _logger.info("performing experiments sequentially")
1✔
300

301
        ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
1✔
302

303
        models = NamedObjectMap(AbstractModel)
1✔
304
        models.extend(self._msis)
1✔
305

306
        # TODO:: replace with context manager
307
        cwd = os.getcwd()
1✔
308
        runner = ExperimentRunner(models)
1✔
309

310
        for experiment in ex_gen:
1✔
311
            outcomes = runner.run_experiment(experiment)
1✔
312
            callback(experiment, outcomes)
1✔
313
        runner.cleanup()
1✔
314
        os.chdir(cwd)
1✔
315

316

317
class MultiprocessingEvaluator(BaseEvaluator):
1✔
318
    """evaluator for experiments using a multiprocessing pool
319

320
    Parameters
321
    ----------
322
    msis : collection of models
323
    n_processes : int (optional)
324
                  A negative number can be inputted to use the number of logical cores minus the negative cores.
325
                  For example, on a 12 thread processor, -2 results in using 10 threads.
326
    max_tasks : int (optional)
327

328

329
    note that the maximum number of available processes is either multiprocessing.cpu_count()
330
    and in case of windows, this never can be higher then 61
331

332
    """
333

334
    def __init__(self, msis, n_processes=None, maxtasksperchild=None, **kwargs):
1✔
335
        super().__init__(msis, **kwargs)
1✔
336

337
        self._pool = None
1✔
338

339
        # Calculate n_processes if negative value is inputted
340
        max_processes = multiprocessing.cpu_count()
1✔
341
        if sys.platform == "win32":
1✔
342
            # on windows the max number of processes is currently
343
            # still limited to 61
344
            max_processes = min(max_processes, 61)
×
345

346
        if isinstance(n_processes, int):
1✔
347
            if n_processes > 0:
1✔
348
                if max_processes < n_processes:
1✔
349
                    warnings.warn(f"the number of processes cannot be more then {max_processes}")
×
350
                self.n_processes = min(n_processes, max_processes)
1✔
351
            else:
352
                self.n_processes = max(max_processes + n_processes, 1)
×
353
        elif n_processes is None:
×
354
            self.n_processes = max_processes
×
355
        else:
356
            raise ValueError("max_processes must be an integer or None")
×
357

358
        self.maxtasksperchild = maxtasksperchild
1✔
359

360
    def initialize(self):
1✔
361
        log_queue = multiprocessing.Queue()
1✔
362

363
        log_queue_reader = LogQueueReader(log_queue)
1✔
364
        log_queue_reader.start()
1✔
365

366
        try:
1✔
367
            loglevel = ema_logging._rootlogger.getEffectiveLevel()
1✔
368
        except AttributeError:
×
369
            loglevel = 30
×
370

371
        # check if we need a working directory
372
        for model in self._msis:
1✔
373
            try:
1✔
374
                model.working_directory
1✔
375
            except AttributeError:
1✔
376
                self.root_dir = None
1✔
377
                break
1✔
378
        else:
379
            random_part = [random.choice(string.ascii_letters + string.digits) for _ in range(5)]
×
380
            random_part = "".join(random_part)
×
381
            self.root_dir = os.path.abspath("tmp" + random_part)
×
382
            os.makedirs(self.root_dir)
×
383

384
        self._pool = multiprocessing.Pool(
1✔
385
            self.n_processes,
386
            initializer,
387
            (self._msis, log_queue, loglevel, self.root_dir),
388
            self.maxtasksperchild,
389
        )
390
        self.n_processes = self._pool._processes
1✔
391
        _logger.info(f"pool started with {self.n_processes} workers")
1✔
392
        return self
1✔
393

394
    def __exit__(self, exc_type, exc_value, traceback):
1✔
395
        _logger.info("terminating pool")
1✔
396

397
        if exc_type is not None:
1✔
398
            # When an exception is thrown stop accepting new jobs
399
            # and abort pending jobs without waiting.
400
            self._pool.terminate()
×
401
            return False
×
402

403
        super().__exit__(exc_type, exc_value, traceback)
1✔
404

405
    def finalize(self):
1✔
406
        # Stop accepting new jobs and wait for pending jobs to finish.
407
        self._pool.close()
1✔
408
        self._pool.join()
1✔
409

410
        if self.root_dir:
1✔
411
            shutil.rmtree(self.root_dir)
×
412

413
    def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
1✔
414
        ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
1✔
415
        add_tasks(self.n_processes, self._pool, ex_gen, callback)
1✔
416

417

418
class MPIEvaluator(BaseEvaluator):
1✔
419
    """Evaluator for experiments using MPI Pool Executor from mpi4py"""
420

421
    def __init__(self, msis, **kwargs):
1✔
422
        from mpi4py.futures import MPIPoolExecutor
×
423

424
        super().__init__(msis, **kwargs)
×
425
        self._pool = None
×
426

427
    def initialize(self):
1✔
428
        self._pool = MPIPoolExecutor()
×
429
        _logger.info(f"MPI pool started with {self._pool._max_workers} workers")
×
430
        return self
×
431

432
    def finalize(self):
1✔
433
        self._pool.shutdown()
×
434
        _logger.info("MPI pool has been shut down")
×
435

436
    def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
1✔
437
        ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
×
438
        experiments = list(ex_gen)  # Convert generator to list
×
439

440
        # Create the model map just like in SequentialEvaluator
441
        models = NamedObjectMap(AbstractModel)
×
442
        models.extend(self._msis)
×
443

444
        # Pack models with each experiment
445
        packed = [(experiment, models) for experiment in experiments]
×
446

447
        # Use the pool to execute in parallel
448
        results = self._pool.map(run_experiment_mpi, packed)
×
449

450
        for experiment, outcomes in results:
×
451
            callback(experiment, outcomes)
×
452

453

454
def run_experiment_mpi(packed_data):
1✔
455
    experiment, all_models = packed_data
×
456
    model = all_models[experiment.model_name]
×
457

458
    runner = ExperimentRunner({experiment.model_name: model})
×
459
    outcomes = runner.run_experiment(experiment)
×
460

461
    return experiment, outcomes
×
462

463

464
class IpyparallelEvaluator(BaseEvaluator):
1✔
465
    """evaluator for using an ipypparallel pool"""
466

467
    def __init__(self, msis, client, **kwargs):
1✔
468
        super().__init__(msis, **kwargs)
1✔
469
        self.client = client
1✔
470

471
    def initialize(self):
1✔
472
        import ipyparallel
1✔
473

474
        _logger.debug("starting ipyparallel pool")
1✔
475

476
        try:
1✔
477
            TIMEOUT_MAX = threading.TIMEOUT_MAX
1✔
478
        except AttributeError:
×
479
            TIMEOUT_MAX = 1e10  # noqa
×
480
        ipyparallel.client.asyncresult._FOREVER = TIMEOUT_MAX
1✔
481
        # update loggers on all engines
482
        self.client[:].apply_sync(set_engine_logger)
1✔
483

484
        _logger.debug("initializing engines")
1✔
485
        initialize_engines(self.client, self._msis, os.getcwd())
1✔
486

487
        self.logwatcher, self.logwatcher_thread = start_logwatcher()
1✔
488

489
        _logger.debug("successfully started ipyparallel pool")
1✔
490
        _logger.info("performing experiments using ipyparallel")
1✔
491

492
        return self
1✔
493

494
    def finalize(self):
1✔
495
        self.logwatcher.stop()
1✔
496
        cleanup(self.client)
1✔
497

498
    def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
1✔
499
        ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
1✔
500

501
        lb_view = self.client.load_balanced_view()
1✔
502
        results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False)
1✔
503

504
        for entry in results:
1✔
505
            callback(*entry)
1✔
506

507

508
def perform_experiments(
1✔
509
    models,
510
    scenarios=0,
511
    policies=0,
512
    evaluator=None,
513
    reporting_interval=None,
514
    reporting_frequency=10,
515
    uncertainty_union=False,
516
    lever_union=False,
517
    outcome_union=False,
518
    uncertainty_sampling=Samplers.LHS,
519
    lever_sampling=Samplers.LHS,
520
    callback=None,
521
    return_callback=False,
522
    combine="factorial",
523
    log_progress=False,
524
):
525
    """sample uncertainties and levers, and perform the resulting experiments
526
    on each of the models
527

528
    Parameters
529
    ----------
530
    models : one or more AbstractModel instances
531
    scenarios : int or collection of Scenario instances, optional
532
    policies :  int or collection of Policy instances, optional
533
    evaluator : Evaluator instance, optional
534
    reporting_interval : int, optional
535
    reporting_frequency: int, optional
536
    uncertainty_union : boolean, optional
537
    lever_union : boolean, optional
538
    outcome_union : boolean, optional
539
    uncertainty_sampling : {LHS, MC, FF, PFF, SOBOL, MORRIS, FAST}, optional
540
    lever_sampling : {LHS, MC, FF, PFF, SOBOL, MORRIS, FAST}, optional TODO:: update doc
541
    callback  : Callback instance, optional
542
    return_callback : boolean, optional
543
    log_progress : bool, optional
544
    combine : {'factorial', 'zipover'}, optional
545
              how to combine uncertainties and levers?
546
              In case of 'factorial', both are sampled separately using their
547
              respective samplers. Next the resulting designs are combined in a
548
              full factorial manner.
549
              In case of 'zipover', both are sampled separately and
550
              then combined by cycling over the shortest of the the two sets
551
              of designs until the longest set of designs is exhausted.
552

553
    Returns
554
    -------
555
    tuple
556
        the experiments as a dataframe, and a dict
557
        with the name of an outcome as key, and the associated values
558
        as numpy array. Experiments and outcomes are aligned on index.
559

560

561
    """
562
    # TODO:: break up in to helper functions
563
    # unreadable in this form
564

565
    if not scenarios and not policies:
×
566
        raise EMAError("no experiments possible since both " "scenarios and policies are 0")
×
567

568
    scenarios, uncertainties, n_scenarios = setup_scenarios(
×
569
        scenarios, uncertainty_sampling, uncertainty_union, models
570
    )
571
    policies, levers, n_policies = setup_policies(policies, lever_sampling, lever_union, models)
×
572

573
    try:
×
574
        n_models = len(models)
×
575
    except TypeError:
×
576
        n_models = 1
×
577

578
    outcomes = determine_objects(models, "outcomes", union=outcome_union)
×
579

580
    if combine == "factorial":
×
581
        nr_of_exp = n_models * n_scenarios * n_policies
×
582

583
        # TODO:: change to 0 policies / 0 scenarios is sampling set to 0 for
584
        # it
585
        _logger.info(
×
586
            ("performing {} scenarios * {} policies * {} model(s) = " "{} experiments").format(
587
                n_scenarios, n_policies, n_models, nr_of_exp
588
            )
589
        )
590
    else:
591
        nr_of_exp = n_models * max(n_scenarios, n_policies)
×
592
        # TODO:: change to 0 policies / 0 scenarios is sampling set to 0 for
593
        # it
594
        _logger.info(
×
595
            ("performing max({} scenarios, {} policies) * {} model(s) = " "{} experiments").format(
596
                n_scenarios, n_policies, n_models, nr_of_exp
597
            )
598
        )
599

600
    callback = setup_callback(
×
601
        callback,
602
        uncertainties,
603
        levers,
604
        outcomes,
605
        nr_of_exp,
606
        reporting_interval,
607
        reporting_frequency,
608
        log_progress,
609
    )
610

611
    if not evaluator:
×
612
        evaluator = SequentialEvaluator(models)
×
613

614
    evaluator.evaluate_experiments(scenarios, policies, callback, combine=combine)
×
615

616
    if callback.i != nr_of_exp:
×
617
        raise EMAError(
×
618
            (
619
                "some fatal error has occurred while "
620
                "running the experiments, not all runs have "
621
                "completed. expected {}, got {}"
622
            ).format(nr_of_exp, callback.i)
623
        )
624

625
    _logger.info("experiments finished")
×
626

627
    if return_callback:
×
628
        return callback
×
629

630
    results = callback.get_results()
×
631
    return results
×
632

633

634
def setup_callback(
1✔
635
    callback,
636
    uncertainties,
637
    levers,
638
    outcomes,
639
    nr_of_exp,
640
    reporting_interval,
641
    reporting_frequency,
642
    log_progress,
643
):
644
    if not callback:
×
645
        callback = DefaultCallback(
×
646
            uncertainties,
647
            levers,
648
            outcomes,
649
            nr_of_exp,
650
            reporting_interval=reporting_interval,
651
            reporting_frequency=reporting_frequency,
652
            log_progress=log_progress,
653
        )
654
    else:
655
        callback = callback(
×
656
            uncertainties,
657
            levers,
658
            outcomes,
659
            nr_of_exp,
660
            reporting_interval=reporting_interval,
661
            reporting_frequency=reporting_frequency,
662
            log_progress=log_progress,
663
        )
664
    return callback
×
665

666

667
def setup_policies(policies, levers_sampling, lever_union, models):
1✔
668
    if not policies:
×
669
        policies = [Policy("None", **{})]
×
670
        levers = []
×
671
        n_policies = 1
×
672
    elif isinstance(policies, numbers.Integral):
×
673
        sampler = levers_sampling
×
674

675
        if not isinstance(sampler, AbstractSampler):
×
676
            sampler = sampler.value
×
677

678
        policies = sample_levers(models, policies, union=lever_union, sampler=sampler)
×
679
        levers = policies.parameters
×
680
        n_policies = policies.n
×
681
    else:
682
        try:
×
683
            levers = policies.parameters
×
684
            n_policies = policies.n
×
685
        except AttributeError:
×
686
            levers = determine_objects(models, "levers", union=True)
×
687
            if isinstance(policies, Policy):
×
688
                policies = [policies]
×
689

690
            levers = [l for l in levers if l.name in policies[0]]
×
691
            n_policies = len(policies)
×
692
    return policies, levers, n_policies
×
693

694

695
def setup_scenarios(scenarios, uncertainty_sampling, uncertainty_union, models):
1✔
696
    if not scenarios:
×
697
        scenarios = [Scenario("None", **{})]
×
698
        uncertainties = []
×
699
        n_scenarios = 1
×
700
    elif isinstance(scenarios, numbers.Integral):
×
701
        sampler = uncertainty_sampling
×
702
        if not isinstance(sampler, AbstractSampler):
×
703
            sampler = sampler.value
×
704
        scenarios = sample_uncertainties(
×
705
            models, scenarios, sampler=sampler, union=uncertainty_union
706
        )
707
        uncertainties = scenarios.parameters
×
708
        n_scenarios = scenarios.n
×
709
    else:
710
        try:
×
711
            uncertainties = scenarios.parameters
×
712
            n_scenarios = scenarios.n
×
713
        except AttributeError:
×
714
            uncertainties = determine_objects(models, "uncertainties", union=True)
×
715
            if isinstance(scenarios, Scenario):
×
716
                scenarios = [scenarios]
×
717

718
            uncertainties = [u for u in uncertainties if u.name in scenarios[0]]
×
719
            n_scenarios = len(scenarios)
×
720
    return scenarios, uncertainties, n_scenarios
×
721

722

723
def optimize(
1✔
724
    models,
725
    algorithm=EpsNSGAII,
726
    nfe=10000,
727
    searchover="levers",
728
    evaluator=None,
729
    reference=None,
730
    convergence=None,
731
    constraints=None,
732
    convergence_freq=1000,
733
    logging_freq=5,
734
    variator=None,
735
    **kwargs,
736
):
737
    """optimize the model
738

739
    Parameters
740
    ----------
741
    models : 1 or more Model instances
742
    algorithm : a valid Platypus optimization algorithm
743
    nfe : int
744
    searchover : {'uncertainties', 'levers'}
745
    evaluator : evaluator instance
746
    reference : Policy or Scenario instance, optional
747
                overwrite the default scenario in case of searching over
748
                levers, or default policy in case of searching over
749
                uncertainties
750
    convergence : function or collection of functions, optional
751
    constraints : list, optional
752
    convergence_freq :  int
753
                        nfe between convergence check
754
    logging_freq : int
755
                   number of generations between logging of progress
756
    variator : platypus GAOperator instance, optional
757
               if None, it falls back on the defaults in platypus-opts
758
               which is SBX with PM
759
    kwargs : any additional arguments will be passed on to algorithm
760

761
    Returns
762
    -------
763
    pandas DataFrame
764

765
    Raises
766
    ------
767
    EMAError if searchover is not one of 'uncertainties' or 'levers'
768
    NotImplementedError if len(models) > 1
769

770
    """
771
    if searchover not in ("levers", "uncertainties"):
×
772
        raise EMAError(
×
773
            "searchover should be one of 'levers' or" "'uncertainties' not {}".format(searchover)
774
        )
775

776
    try:
×
777
        if len(models) == 1:
×
778
            models = models[0]
×
779
        else:
780
            raise NotImplementedError("optimization over multiple" "models yet supported")
×
781
    except TypeError:
×
782
        pass
×
783

784
    problem = to_problem(models, searchover, constraints=constraints, reference=reference)
×
785

786
    # solve the optimization problem
787
    if not evaluator:
×
788
        evaluator = SequentialEvaluator(models)
×
789

790
    return _optimize(
×
791
        problem,
792
        evaluator,
793
        algorithm,
794
        convergence,
795
        nfe,
796
        convergence_freq,
797
        logging_freq,
798
        variator=variator,
799
        **kwargs,
800
    )
801

802

803
def robust_optimize(
1✔
804
    model,
805
    robustness_functions,
806
    scenarios,
807
    evaluator=None,
808
    algorithm=EpsNSGAII,
809
    nfe=10000,
810
    convergence=None,
811
    constraints=None,
812
    convergence_freq=1000,
813
    logging_freq=5,
814
    **kwargs,
815
):
816
    """perform robust optimization
817

818
    Parameters
819
    ----------
820
    model : model instance
821
    robustness_functions : collection of ScalarOutcomes
822
    scenarios : int, or collection
823
    evaluator : Evaluator instance
824
    algorithm : platypus Algorithm instance
825
    nfe : int
826
    convergence : list
827
                  list of convergence metrics
828
    constraints : list
829
    convergence_freq :  int
830
                        nfe between convergence check
831
    logging_freq : int
832
                   number of generations between logging of progress
833
    kwargs : any additional arguments will be passed on to algorithm
834

835
    Raises
836
    ------
837
    AssertionError if robustness_function is not a ScalarOutcome,
838
    if robustness_function.kind is INFO, or
839
    if robustness_function.function is None
840

841
    robustness functions are scalar outcomes, kind should be MINIMIZE or
842
    MAXIMIZE, function is the robustness function you want to use.
843

844
    """
845
    for rf in robustness_functions:
×
846
        assert isinstance(rf, ScalarOutcome)
×
847
        assert rf.kind != AbstractOutcome.INFO
×
848
        assert rf.function is not None
×
849

850
    problem = to_robust_problem(
×
851
        model, scenarios, constraints=constraints, robustness_functions=robustness_functions
852
    )
853

854
    # solve the optimization problem
855
    if not evaluator:
×
856
        evaluator = SequentialEvaluator(model)
×
857

858
    return _optimize(
×
859
        problem,
860
        evaluator,
861
        algorithm,
862
        convergence,
863
        int(nfe),
864
        convergence_freq,
865
        logging_freq,
866
        **kwargs,
867
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc