• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 19097686945

05 Nov 2025 09:40AM UTC coverage: 92.672% (+0.009%) from 92.663%
19097686945

Pull #433

github

web-flow
Merge 6e2022ea7 into 025a50353
Pull Request #433: fixes for warnings

23 of 23 new or added lines in 6 files covered. (100.0%)

35 existing lines in 2 files now uncovered.

8498 of 9170 relevant lines covered (92.67%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.77
/test/test_em_framework/test_futures_ipyparallel.py
1
"""Tests for ipyparallelEvaluator.
2

3
Test code for ema_ipyparallel. The setup and teardown of the cluster is
4
taken from the ipyparallel test code with some minor adaptations
5

6

7
"""
8

9
# Created on Jul 28, 2015
10
# .. codeauthor:: jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
11

12
import logging
1✔
13
import os
1✔
14
import time
1✔
15
import unittest
1✔
16
import unittest.mock as mock
1✔
17
import warnings
1✔
18
from subprocess import STDOUT, Popen
1✔
19

20
import ipyparallel
1✔
21
from ipyparallel import Client
1✔
22
from ipyparallel.cluster.launcher import (
1✔
23
    SIGKILL,
24
    LocalProcessLauncher,
25
    ProcessStateError,
26
    ipcontroller_cmd_argv,
27
    ipengine_cmd_argv,
28
)
29
from IPython.paths import get_ipython_dir
1✔
30
from jupyter_client.localinterfaces import localhost
1✔
31

32
import ema_workbench
1✔
33
from ema_workbench.em_framework import Model, experiment_runner
1✔
34
from ema_workbench.em_framework import futures_ipyparallel as ema
1✔
35
from ema_workbench.em_framework.futures_ipyparallel import LogWatcher
1✔
36
from ema_workbench.util import EMAError, EMAParallelError, ema_logging
1✔
37

38
launchers = []
1✔
39
blackhole = os.open(os.devnull, os.O_WRONLY)
1✔
40

41
warnings.filterwarnings("ignore", category=DeprecationWarning, module=".*/IPython/.*")
1✔
42

43

44
# Launcher class
45
class MockProcessLauncher(LocalProcessLauncher):
1✔
46
    """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows."""
47

48
    def start(self):
1✔
49
        if self.state == "before":
1✔
50
            # Store stdout & stderr to show with failing tests.
51
            # This is defined in IPython.testing.iptest
52
            self.process = Popen(
1✔
53
                self.args,
54
                stdout=blackhole,
55
                stderr=STDOUT,
56
                env=os.environ,
57
                cwd=self.work_dir,
58
            )
59
            self.notify_start(self.process.pid)
1✔
60
            self.poll = self.process.poll
1✔
61
        else:
62
            s = f"The process was already started and has state: {self.state!r}"
×
63
            raise ProcessStateError(s)
×
64

65

66
def add_engines(n=1, profile="iptest", total=False):
1✔
67
    """Add a number of engines to a given profile.
68

69
    If total is True, then already running engines are counted, and only
70
    the additional engines necessary (if any) are started.
71
    """
72
    rc = Client(profile=profile)
1✔
73
    base = len(rc)
1✔
74

75
    if total:
1✔
76
        n = max(n - base, 0)
1✔
77

78
    eps = []
1✔
79
    for i in range(n):
1✔
80
        ep = MockProcessLauncher()
1✔
81
        ep.cmd_and_args = ipengine_cmd_argv + [
1✔
82
            f"--profile={profile}",
83
            "--InteractiveShell.colors=nocolor",
84
        ]
85
        ep.start()
1✔
86
        launchers.append(ep)
1✔
87
        eps.append(ep)
1✔
88
    tic = time.time()
1✔
89
    while len(rc) < base + n:
1✔
90
        if any(ep.poll() is not None for ep in eps):
1✔
91
            raise RuntimeError("A test engine failed to start.")
×
92
        elif time.time() - tic > 15:
1✔
93
            raise RuntimeError("Timeout waiting for engines to connect.")
×
94
        time.sleep(0.1)
1✔
95
    rc.close()
1✔
96
    return eps
1✔
97

98

99
def setUpModule():
1✔
100
    cluster_dir = os.path.join(get_ipython_dir(), "profile_default")
1✔
101
    engine_json = os.path.join(cluster_dir, "security", "ipcontroller-engine.json")
1✔
102
    client_json = os.path.join(cluster_dir, "security", "ipcontroller-client.json")
1✔
103
    for json in (engine_json, client_json):
1✔
104
        if os.path.exists(json):
1✔
105
            os.remove(json)
×
106

107
    cp = MockProcessLauncher()
1✔
108
    cp.cmd_and_args = ipcontroller_cmd_argv + ["--profile=default", "--log-level=20"]
1✔
109
    cp.start()
1✔
110
    launchers.append(cp)
1✔
111
    tic = time.time()
1✔
112
    while not os.path.exists(engine_json) or not os.path.exists(client_json):
1✔
113
        if cp.poll() is not None:
1✔
114
            raise RuntimeError(f"The test controller exited with status {cp.poll()}")
×
115
        elif time.time() - tic > 15:
1✔
116
            raise RuntimeError("Timeout waiting for the test controller to start.")
×
117
        time.sleep(0.1)
1✔
118

119
    add_engines(2, profile="default", total=True)
1✔
120

121

122
def tearDownModule():
1✔
123
    try:
1✔
124
        time.sleep(1)
1✔
125
    except KeyboardInterrupt:
×
126
        return
×
127
    while launchers:
1✔
128
        p = launchers.pop()
1✔
129
        if p.poll() is None:
1✔
130
            try:
1✔
131
                p.stop()
1✔
132
            except Exception as e:
×
133
                print(e)
×
134
        if p.poll() is None:
1✔
135
            try:
1✔
136
                time.sleep(0.25)
1✔
137
            except KeyboardInterrupt:
×
138
                return
×
139
        if p.poll() is None:
1✔
140
            try:
1✔
141
                print("cleaning up test process...")
1✔
142
                p.signal(SIGKILL)
1✔
143
            except:
×
144
                print("couldn't shutdown process: ", p)
×
145

146

147
#     blackhole.close()
148

149

150
class TestEngineLoggerAdapter(unittest.TestCase):
1✔
151
    def tearDown(self):
1✔
152
        ema_logging._logger = None
1✔
153
        ema_logger = logging.getLogger(ema_logging.LOGGER_NAME)
1✔
154
        ema_logger.handlers = []
1✔
155

156
    def test_directly(self):
1✔
157
        with mock.patch("ema_workbench.util.ema_logging._logger") as mocked_logger:
1✔
158
            adapter = ema.EngingeLoggerAdapter(mocked_logger, ema.SUBTOPIC)
1✔
159
            self.assertEqual(mocked_logger, adapter.logger)
1✔
160
            self.assertEqual(ema.SUBTOPIC, adapter.topic)
1✔
161

162
            input_msg = "test"
1✔
163
            input_kwargs = {}
1✔
164
            msg, kwargs = adapter.process(input_msg, input_kwargs)
1✔
165

166
            self.assertEqual(f"{ema.SUBTOPIC}::{input_msg}", msg)
1✔
167
            self.assertEqual(input_kwargs, kwargs)
1✔
168

169
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.EngingeLoggerAdapter")
1✔
170
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.Application")
1✔
171
    def test_engine_logger(self, mocked_application, mocked_adapter):
1✔
172
        logger = ema_logging.get_rootlogger()
1✔
173
        mocked_logger = mock.Mock(spec=logger)
1✔
174
        mocked_logger.handlers = []
1✔
175
        mocked_logger.manager = mock.Mock(spec=logging.Manager)
1✔
176
        mocked_logger.manager.disable = 0
1✔
177
        ema_logging._logger = mocked_logger
1✔
178

179
        mocked_application.instance.return_value = mocked_application
1✔
180
        mocked_application.log = mocked_logger
1✔
181

182
        # no handlers
183
        ema.set_engine_logger()
1✔
184
        logger = ema_logging._logger
1✔
185
        #         self.assertTrue(type(logger) == type(mocked_adapter))
186
        mocked_logger.setLevel.assert_called_once_with(ema_logging.DEBUG)
1✔
187
        mocked_adapter.assert_called_with(mocked_logger, ema.SUBTOPIC)
1✔
188

189
        # with handlers
190
        mocked_logger = mock.create_autospec(logging.Logger)
1✔
191

192
        #         ipyparallel.
193

194
        #         mock_engine_handler = mock.create_autospec(ipyparallel.log.EnginePUBHandler)
195
        mocked_logger.handlers = []  # [mock_engine_handler]
1✔
196

197
        mocked_application.instance.return_value = mocked_application
1✔
198
        mocked_application.log = mocked_logger
1✔
199

200
        ema.set_engine_logger()
1✔
201
        logger = ema_logging._logger
1✔
202
        #         self.assertTrue(type(logger) == ema.EngingeLoggerAdapter)
203
        mocked_logger.setLevel.assert_called_once_with(ema_logging.DEBUG)
1✔
204
        mocked_adapter.assert_called_with(mocked_logger, ema.SUBTOPIC)
1✔
205

206

207
#         mock_engine_handler.setLevel.assert_called_once_with(ema_logging.DEBUG)
208

209

210
#     def test_on_cluster(self):
211
#         client = ipyparallel.Client(profile='default')
212
#         client[:].apply_sync(ema.set_engine_logger)
213
#
214
#         def test_engine_logger():
215
#             from em_framework import ema_logging # @Reimport
216
#             from em_framework import ema_parallel_ipython as ema # @Reimport
217
#
218
#             logger = ema_logging._logger
219
#
220
#             tests = []
221
#             tests.append((type(logger) == ema.EngingeLoggerAdapter,
222
#                           'logger adapter'))
223
#             tests.append((logger.logger.level == ema_logging.DEBUG,
224
#                           'logger level'))
225
#             tests.append((logger.topic == ema.SUBTOPIC,
226
#                           'logger subptopic'))
227
#             return tests
228
#
229
#         for engine in client.ids:
230
#             tests = client[engine].apply_sync(test_engine_logger)
231
#             for test in tests:
232
#                 test, msg = test
233
#                 self.assertTrue(test, msg)
234
#
235
#         client.clear(block=True)
236

237

238
class TestLogWatcher(unittest.TestCase):
1✔
239
    @classmethod
1✔
240
    def setUpClass(cls):
1✔
241
        logger = ema_logging.get_rootlogger()
1✔
242
        mocked_logger = mock.Mock(spec=logger)
1✔
243
        mocked_logger.handlers = []
1✔
244
        ema_logging._logger = mocked_logger
1✔
245

246
        cls.client = mock.Mock(spec=ipyparallel.Client)
1✔
247
        cls.url = f"tcp://{localhost()}:20202"
1✔
248
        #         cls.watcher, cls.thread = ema.start_logwatcher()
249
        cls.watcher = LogWatcher()
1✔
250

251
    @classmethod
1✔
252
    def tearDownClass(cls):
1✔
253
        cls.watcher.stop()
1✔
254
        # TODO use some way to signal the thread to terminate
255
        # despite that it is a daemon thread
256

257
    def tearDown(self):
1✔
258
        self.client.clear(block=True)
1✔
259

260
    def test_init(self):
1✔
261
        self.assertEqual(self.url, self.watcher.url)
1✔
262

263
    def test_extract_level(self):
1✔
264
        level = "INFO"
1✔
265
        topic = ema.SUBTOPIC
1✔
266
        topic_str = f"engine.1.{level}.{topic}"
1✔
267
        extracted_level, extracted_topics = self.watcher._extract_level(topic_str)
1✔
268

269
        self.assertEqual(ema_logging.INFO, extracted_level)
1✔
270
        self.assertEqual(f"engine.1.{topic}", extracted_topics)
1✔
271

272
        topic = ema.SUBTOPIC
1✔
273
        topic_str = f"engine.1.{topic}"
1✔
274
        extracted_level, extracted_topics = self.watcher._extract_level(topic_str)
1✔
275

276
        self.assertEqual(ema_logging.INFO, extracted_level)
1✔
277
        self.assertEqual(f"engine.1.{topic}", extracted_topics)
1✔
278

279
    def test_log_message(self):
1✔
280
        # no subscription on level
281
        with mock.patch("logging.getLogger") as mocked:
1✔
282
            mocked_logger = mock.Mock(spec=logging.Logger)
1✔
283
            mocked.return_value = mocked_logger
1✔
284
            raw = [b"engine.1.INFO.EMA", b"test"]
1✔
285
            self.watcher.log_message(raw)
1✔
286
            mocked_logger.log.assert_called_once_with(
1✔
287
                ema_logging.INFO, "[engine.1] test"
288
            )
289

290
        with mock.patch("logging.getLogger") as mocked:
1✔
291
            mocked_logger = mock.Mock(spec=logging.Logger)
1✔
292
            mocked.return_value = mocked_logger
1✔
293
            raw = [b"engine.1.DEBUG.EMA", b"test"]
1✔
294
            self.watcher.log_message(raw)
1✔
295
            mocked_logger.log.assert_called_once_with(
1✔
296
                ema_logging.DEBUG, "[engine.1] test"
297
            )
298

299
        with mock.patch("logging.getLogger") as mocked:
1✔
300
            mocked_logger = mock.Mock(spec=logging.Logger)
1✔
301
            mocked.return_value = mocked_logger
1✔
302
            raw = [b"engine.1.DEBUG", b"test", b"more"]
1✔
303
            self.watcher.log_message(raw)
1✔
304
            raw = [r.decode("utf-8") for r in raw]
1✔
305
            mocked_logger.error.assert_called_once_with(f"Invalid log message: {raw}")
1✔
306

307
        with mock.patch("logging.getLogger") as mocked:
1✔
308
            mocked_logger = mock.Mock(spec=logging.Logger)
1✔
309
            mocked.return_value = mocked_logger
1✔
310
            raw = [b"engine1DEBUG", b"test"]
1✔
311
            self.watcher.log_message(raw)
1✔
312
            raw = [r.decode("utf-8") for r in raw]
1✔
313
            mocked_logger.error.assert_called_once_with(f"Invalid log message: {raw}")
1✔
314

315

316
class TestEngine(unittest.TestCase):
1✔
317
    @classmethod
1✔
318
    def setUpClass(cls):
1✔
319
        cls.client = mock.Mock(spec=ipyparallel.Client)
1✔
320

321
    #         cls.client = ipyparallel.Client()
322

323
    @classmethod
1✔
324
    def tearDownClass(cls):
1✔
325
        pass
1✔
326

327
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.get_engines_by_host")
1✔
328
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.os")
1✔
329
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.socket")
1✔
330
    def test_update_cwd_on_all_engines(
1✔
331
        self, mock_socket, mock_os, mock_engines_by_host
332
    ):
333
        mock_socket.gethostname.return_value = "test host"
1✔
334

335
        mock_client = mock.create_autospec(ipyparallel.Client)
1✔
336
        mock_client.ids = [0, 1]  # pretend we have two engines
1✔
337
        mock_view = mock.create_autospec(
1✔
338
            ipyparallel.client.view.View
339
        )  # @ @UndefinedVariable
340
        mock_client.__getitem__.return_value = mock_view
1✔
341

342
        mock_engines_by_host.return_value = {"test host": [0, 1]}
1✔
343

344
        mock_os.getcwd.return_value = "/test"
1✔
345

346
        # engines on same host
347
        ema.update_cwd_on_all_engines(mock_client)
1✔
348
        mock_view.apply.assert_called_with(mock_os.chdir, "/test")
1✔
349

350
        # engines on another host
351
        mock_engines_by_host.return_value = {"other host": [0, 1]}
1✔
352
        self.assertRaises(
1✔
353
            NotImplementedError, ema.update_cwd_on_all_engines, mock_client
354
        )
355

356
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.socket")
1✔
357
    def test_get_engines_by_host(self, mock_socket):
1✔
358
        mock_client = mock.create_autospec(ipyparallel.Client)
1✔
359
        mock_client.ids = [0, 1]  # pretend we have two engines
1✔
360
        mock_view = mock.create_autospec(
1✔
361
            ipyparallel.client.view.View
362
        )  # @ @UndefinedVariable
363
        mock_client.__getitem__.return_value = mock_view
1✔
364
        mock_view.apply_sync.return_value = "test host"
1✔
365

366
        engines_by_host = ema.get_engines_by_host(mock_client)
1✔
367
        print(engines_by_host)
1✔
368
        self.assertEqual({"test host": [0, 1]}, engines_by_host)
1✔
369

370
    def test_init(self):
1✔
371
        msis = []
1✔
372
        engine_id = 0
1✔
373
        engine = ema.Engine(engine_id, msis, ".")
1✔
374

375
        self.assertEqual(engine_id, engine.engine_id)
1✔
376
        self.assertEqual(msis, engine.msis)
1✔
377
        self.assertEqual(experiment_runner.ExperimentRunner, type(engine.runner))
1✔
378

379
    def test_run_experiment(self):
1✔
380
        function = mock.Mock()
1✔
381
        mock_msi = Model("test", function)
1✔
382
        mock_runner = mock.create_autospec(experiment_runner.ExperimentRunner)
1✔
383

384
        msis = [mock_msi]
1✔
385
        engine_id = 0
1✔
386
        engine = ema.Engine(engine_id, msis, ".")
1✔
387
        engine.runner = mock_runner
1✔
388

389
        experiment = {"a": 1}
1✔
390
        engine.run_experiment(experiment)
1✔
391

392
        mock_runner.run_experiment.assert_called_once_with(experiment)
1✔
393

394
        mock_runner.run_experiment.side_effect = EMAError
1✔
395
        self.assertRaises(EMAError, engine.run_experiment, experiment)
1✔
396

397
        mock_runner.run_experiment.side_effect = Exception
1✔
398
        self.assertRaises(EMAParallelError, engine.run_experiment, experiment)
1✔
399

400

401
class TestIpyParallelUtilFunctions(unittest.TestCase):
1✔
402
    def test_initialize_engines(self):
1✔
403
        function = mock.Mock()
1✔
404
        mock_msi = Model("test", function)
1✔
405
        msis = {mock_msi.name: mock_msi}
1✔
406

407
        mock_client = mock.create_autospec(ipyparallel.Client)
1✔
408
        mock_client.ids = [0, 1]  # pretend we have two engines
1✔
409
        mock_view = mock.create_autospec(
1✔
410
            ipyparallel.client.view.View
411
        )  # @ @UndefinedVariable
412
        mock_client.__getitem__.return_value = mock_view
1✔
413

414
        cwd = "."
1✔
415
        ema.initialize_engines(mock_client, msis, cwd)
1✔
416

417
        mock_view.apply_sync.assert_any_call(ema._initialize_engine, 0, msis, cwd)
1✔
418
        mock_view.apply_sync.assert_any_call(ema._initialize_engine, 1, msis, cwd)
1✔
419

420

421
class TestIpyParallelEvaluator(unittest.TestCase):
1✔
422
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.set_engine_logger")
1✔
423
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.initialize_engines")
1✔
424
    @mock.patch("ema_workbench.em_framework.futures_ipyparallel.start_logwatcher")
1✔
425
    @mock.patch("ema_workbench.em_framework.evaluators.DefaultCallback")
1✔
426
    def test_ipyparallel_evaluator(
1✔
427
        self,
428
        mocked_callback,
429
        mocked_start,
430
        mocked_initialize,
431
        mocked_set,
432
    ):
433
        model = mock.Mock(spec=ema_workbench.Model)
1✔
434
        model.name = "test"
1✔
435

436
        mocked_generator = mock.Mock(
1✔
437
            "ema_workbench.em_framework.points.experiment_generator"
438
        )
439
        mocked_generator.return_value = [1]
1✔
440
        mocked_start.return_value = mocked_start, None
1✔
441

442
        client = mock.MagicMock(spec=ipyparallel.Client)
1✔
443
        lb_view = mock.Mock()
1✔
444
        lb_view.map.return_value = [(1, ({}, {}))]
1✔
445

446
        client.load_balanced_view.return_value = lb_view
1✔
447

448
        with ema.IpyparallelEvaluator(model, client) as evaluator:
1✔
449
            evaluator.evaluate_experiments(mocked_generator, mocked_callback)
1✔
450
            lb_view.map.assert_called_once()
1✔
451

452

453
if __name__ == "__main__":
1✔
UNCOV
454
    unittest.main()
×
UNCOV
455
    time.sleep(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc