• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 6695764125

30 Oct 2023 04:25PM UTC coverage: 80.893%. Remained the same
6695764125

push

github

web-flow
examples: Speedup the lake_problem function by ~30x (#301)

Speed up the lake_problem function by around 30x.

Benchmark: Average duration of 100 lake_model experiments: 0.940 seconds vs 0.030 seconds (speed up: 31.41).

I validated the output and it is consistent with the previous output.

The improved performance might allow more and broader analysis to be done in the exercises.

Also print a warning on the decisions KeyError

4619 of 5710 relevant lines covered (80.89%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.12
/ema_workbench/em_framework/ema_ipyparallel.py
1
"""
2
This module provides functionality for combining the EMA workbench
3
with IPython parallel.
4

5
"""
6
import collections
1✔
7
import logging
1✔
8
import os
1✔
9
import shutil
1✔
10
import socket
1✔
11
import threading
1✔
12

13
import zmq
1✔
14
from ipyparallel.engine.log import EnginePUBHandler
1✔
15
from jupyter_client.localinterfaces import localhost
1✔
16
from traitlets import Unicode, Instance, List
1✔
17
from traitlets.config import Application
1✔
18
from traitlets.config.configurable import LoggingConfigurable
1✔
19
from zmq.eventloop import ioloop, zmqstream
1✔
20

21
from . import experiment_runner
1✔
22
from .ema_multiprocessing import setup_working_directories
1✔
23
from .model import AbstractModel
1✔
24
from .util import NamedObjectMap
1✔
25
from ..util import ema_exceptions, ema_logging, get_module_logger
1✔
26

27
# Created on Jul 16, 2015
28
#
29
# .. codeauthor::  jhkwakkel
30

31
SUBTOPIC = ema_logging.LOGGER_NAME
1✔
32
engine = None
1✔
33

34
_logger = get_module_logger(__name__)
1✔
35

36

37
class EngingeLoggerAdapter(logging.LoggerAdapter):
1✔
38
    """LoggerAdapter that inserts EMA as a topic into log messages
39

40
    Parameters
41
    ----------
42
    logger : logger instance
43
    topic : str
44

45
    """
46

47
    def __init__(self, logger, topic):
1✔
48
        super().__init__(logger, None)
1✔
49
        self.topic = topic
1✔
50

51
    def process(self, msg, kwargs):
1✔
52
        msg = f"{self.topic}::{msg}"
1✔
53
        return msg, kwargs
1✔
54

55

56
def start_logwatcher():
1✔
57
    """convenience function for starting the LogWatcher
58

59
    Returns
60
    -------
61
    LogWatcher
62
        the log watcher instance
63
    Thread
64
        the log watcher thread
65

66
    .. note : there can only be one log watcher on a given url.
67

68
    """
69

70
    logwatcher = LogWatcher()
×
71

72
    def starter():
×
73
        logwatcher.start()
×
74
        try:
×
75
            logwatcher.loop.start()
×
76
        except (zmq.error.ZMQError, OSError):
×
77
            _logger.warning("shutting down log watcher")
×
78
        except RuntimeError:
×
79
            pass
×
80

81
    logwatcher_thread = threading.Thread(target=starter)
×
82
    logwatcher_thread.daemon = True
×
83
    logwatcher_thread.start()
×
84

85
    return logwatcher, logwatcher_thread
×
86

87

88
def set_engine_logger():
1✔
89
    """Updates EMA logging on the engines with an EngineLoggerAdapter
90
    This adapter injects EMA as a topic into all messages
91
    """
92

93
    logger = Application.instance().log
1✔
94
    logger.setLevel(ema_logging.DEBUG)
1✔
95

96
    for handler in logger.handlers:
1✔
97
        if isinstance(handler, EnginePUBHandler):  # @UndefinedVariable
×
98
            handler.setLevel(ema_logging.DEBUG)
×
99

100
    adapter = EngingeLoggerAdapter(logger, SUBTOPIC)
1✔
101
    ema_logging._logger = adapter
1✔
102

103
    _logger.debug("updated logger")
1✔
104

105

106
def get_engines_by_host(client):
1✔
107
    """returns the engine ids by host
108

109
    Parameters
110
    ----------
111
    client : IPython.parallel.Client instance
112

113
    Returns
114
    -------
115
    dict
116
        a dict with hostnames as keys, and a list
117
        of engine ids
118

119
    """
120

121
    results = {i: client[i].apply_sync(socket.gethostname) for i in client.ids}
1✔
122

123
    engines_by_host = collections.defaultdict(list)
1✔
124
    for engine_id, host in results.items():
1✔
125
        engines_by_host[host].append(engine_id)
1✔
126
    return engines_by_host
1✔
127

128

129
def update_cwd_on_all_engines(client):
1✔
130
    """updates the current working directory on the engines to point to the
131
    same working directory as this notebook
132

133
    currently only works if engines are on same machine.
134

135
    Parameters
136
    ----------
137
    client : IPython.parallel.Client instance
138

139
    """
140

141
    engines_by_host = get_engines_by_host(client)
1✔
142

143
    notebook_host = socket.gethostname()
1✔
144
    for key, value in engines_by_host.items():
1✔
145
        if key == notebook_host:
1✔
146
            cwd = os.getcwd()
1✔
147

148
            # easy, we know the correct cwd
149
            for engine in value:
1✔
150
                client[engine].apply(os.chdir, cwd)
1✔
151
        else:
152
            raise NotImplementedError("not yet supported")
1✔
153

154

155
class LogWatcher(LoggingConfigurable):
1✔
156
    """A simple class that receives messages on a SUB socket, as published
157
    by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
158

159
    This can subscribe to multiple topics, but defaults to all topics.
160
    """
161

162
    # configurables
163
    topics = List(
1✔
164
        [""], help=("The ZMQ topics to subscribe to. Default is to" "subscribe to all messages")
165
    ).tag(config=True)
166
    url = Unicode(help="ZMQ url on which to listen for log messages").tag(config=True)
1✔
167

168
    # internals
169
    stream = Instance("zmq.eventloop.zmqstream.ZMQStream", allow_none=True)
1✔
170
    context = Instance(zmq.Context)
1✔
171
    loop = Instance("tornado.ioloop.IOLoop")  # @UndefinedVariable
1✔
172

173
    def _url_default(self):
1✔
174
        return f"tcp://{localhost()}:20202"
1✔
175

176
    def _context_default(self):
1✔
177
        return zmq.Context.instance()
1✔
178

179
    def _loop_default(self):
1✔
180
        return ioloop.IOLoop.instance()
1✔
181

182
    def __init__(self, **kwargs):
1✔
183
        super().__init__(**kwargs)
1✔
184
        self.s = self.context.socket(zmq.SUB)  # @UndefinedVariable
1✔
185
        self.s.bind(self.url)
1✔
186
        self.stream = zmqstream.ZMQStream(self.s, self.loop)
1✔
187
        self.subscribe()
1✔
188
        self.observe(self.subscribe, "topics")
1✔
189

190
    def start(self):
1✔
191
        self.stream.on_recv(self.log_message)
×
192

193
    def stop(self):
1✔
194
        self.stream.setsockopt(zmq.UNSUBSCRIBE, b"")  # @UndefinedVariable
1✔
195
        self.stream.stop_on_recv()
1✔
196
        self.s.close()
1✔
197

198
    def subscribe(self):
1✔
199
        """Update our SUB socket's subscriptions."""
200
        self.stream.setsockopt(zmq.UNSUBSCRIBE, b"")  # @UndefinedVariable
1✔
201
        if "" in self.topics:
1✔
202
            self.log.debug("Subscribing to: everything")
1✔
203
            self.stream.setsockopt(zmq.SUBSCRIBE, b"")  # @UndefinedVariable
1✔
204
        else:
205
            for topic in self.topics:
×
206
                self.log.debug(f"Subscribing to: {topic!r}")
×
207
                # @UndefinedVariable
208
                self.stream.setsockopt(zmq.SUBSCRIBE, topic)
×
209

210
    def _extract_level(self, topic_str):
1✔
211
        """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
212
        topics = topic_str.split(".")
1✔
213
        for idx, t in enumerate(topics):
1✔
214
            level = getattr(logging, t, None)
1✔
215
            if level is not None:
1✔
216
                break
1✔
217

218
        if level is None:
1✔
219
            level = logging.INFO
1✔
220
        else:
221
            topics.pop(idx)
1✔
222

223
        return level, ".".join(topics)
1✔
224

225
    def log_message(self, raw):
1✔
226
        """receive and parse a message, then log it."""
227
        raw = [r.decode("utf-8") for r in raw]
1✔
228

229
        if len(raw) != 2 or "." not in raw[0]:
1✔
230
            logging.getLogger().error(f"Invalid log message: {raw}")
1✔
231
            return
1✔
232
        else:
233
            topic, msg = raw
1✔
234
            level, topic = self._extract_level(topic)
1✔
235

236
            # bit of a hacky way to filter messages
237
            # assumes subtopic only contains a single dot
238
            # main topic is now the substring with the origin of the message
239
            # so e.g. engine.1
240
            main_topic, subtopic = topic.rsplit(".", 1)
1✔
241
            log = logging.getLogger(subtopic)
1✔
242

243
            if msg[-1] == "\n":
1✔
244
                msg = msg[:-1]
×
245

246
            log.log(level, f"[{main_topic}] {msg}")
1✔
247

248

249
class Engine:
1✔
250
    """class for setting up ema specific stuff on each engine
251
    also functions as a convenient namespace for workbench
252
    relevant variables
253

254
    Parameters
255
    ----------
256
    engine_id : int
257
    msis : list
258
    cwd : str
259

260
    """
261

262
    def __init__(self, engine_id, msis, cwd):
1✔
263
        _logger.debug(f"starting engine {engine_id}")
1✔
264
        self.engine_id = engine_id
1✔
265
        self.msis = msis
1✔
266

267
        _logger.debug(f"setting root working directory to {cwd}")
1✔
268
        os.chdir(cwd)
1✔
269

270
        models = NamedObjectMap(AbstractModel)
1✔
271
        models.extend(msis)
1✔
272
        self.runner = experiment_runner.ExperimentRunner(models)
1✔
273

274
        self.tmpdir = setup_working_directories(msis, os.getcwd())
1✔
275

276
    def cleanup_working_directory(self):
1✔
277
        """remove the root working directory of the engine"""
278
        if self.tmpdir:
×
279
            shutil.rmtree(self.tmpdir)
×
280

281
    def run_experiment(self, experiment):
1✔
282
        """run the experiment, the actual running is delegated
283
        to an ExperimentRunner instance"""
284

285
        try:
1✔
286
            return self.runner.run_experiment(experiment)
1✔
287
        except ema_exceptions.EMAError:
1✔
288
            raise
1✔
289
        except Exception:
1✔
290
            raise ema_exceptions.EMAParallelError(str(Exception))
1✔
291

292

293
def initialize_engines(client, msis, cwd):
1✔
294
    """initialize engine instances on all engines
295

296
    Parameters
297
    ----------
298
    client : IPython.parallel.Client
299
    msis : dict
300
           dict of model structure interfaces with their names as keys
301
    cwd : str
302

303
    """
304
    for i in client.ids:
1✔
305
        client[i].apply_sync(_initialize_engine, i, msis, cwd)
1✔
306

307

308
def cleanup(client):
1✔
309
    """cleanup directory tree structure on all engines"""
310
    client[:].apply_sync(_cleanup)
1✔
311

312

313
# engines can only deal with functions, not with object.method calls
314
# these functions are wrappers around the relevant Engine methods
315
# the engine instance is part of the namespace of the module.
316
def _run_experiment(experiment):
1✔
317
    """wrapper function for engine.run_experiment"""
318

319
    return experiment, engine.run_experiment(experiment)
×
320

321

322
def _initialize_engine(engine_id, msis, cwd):
1✔
323
    """wrapper function for initializing an engine"""
324
    global engine
325
    engine = Engine(engine_id, msis, cwd)
×
326

327

328
def _cleanup():
1✔
329
    """wrapper function for engine.cleanup_working_directory"""
330
    global engine
331
    engine.cleanup_working_directory()
×
332
    del engine
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc