• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12480245183

24 Dec 2024 10:07AM UTC coverage: 96.374% (+0.007%) from 96.367%
12480245183

Pull #532

github

web-flow
Merge fcb6c06bf into 5cf3ecc7e
Pull Request #532: More error tests

2 of 2 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

1010 of 1048 relevant lines covered (96.37%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.53
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Callable, List, Optional
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
27
        """
28
        Submits a callable to be executed with the given arguments.
29

30
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
31
        a Future instance representing the execution of the callable.
32

33
        Args:
34
            fn (callable): function to submit for execution
35
            args: arguments for the submitted function
36
            kwargs: keyword arguments for the submitted function
37
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
38
                                  function. Example resource dictionary: {
39
                                      cores: 1,
40
                                      threads_per_core: 1,
41
                                      gpus_per_worker: 0,
42
                                      oversubscribe: False,
43
                                      cwd: None,
44
                                      executor: None,
45
                                      hostname_localhost: False,
46
                                  }
47

48
        Returns:
49
            Future: A Future representing the given call.
50
        """
51
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
52
        check_resource_dict(function=fn)
1✔
53
        f = Future()
1✔
54
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
55
        return f
1✔
56

57
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
58
        """Clean-up the resources associated with the Executor.
59

60
        It is safe to call this method several times. Otherwise, no other
61
        methods can be called after this one.
62

63
        Args:
64
            wait: If True then shutdown will not return until all running
65
                futures have finished executing and the resources used by the
66
                parallel_executors have been reclaimed.
67
            cancel_futures: If True then shutdown will cancel all pending
68
                futures. Futures that are completed or running will not be
69
                cancelled.
70
        """
71
        if cancel_futures:
1✔
72
            cancel_items_in_queue(que=self._future_queue)
1✔
73
        if self._process is not None:
1✔
74
            for _ in range(len(self._process)):
1✔
75
                self._future_queue.put({"shutdown": True, "wait": wait})
1✔
76
            if wait:
1✔
77
                for process in self._process:
1✔
78
                    process.join()
1✔
79
                self._future_queue.join()
1✔
80
        self._process = None
1✔
81
        self._future_queue = None
1✔
82

83
    def _set_process(self, process: List[RaisingThread]):
1✔
84
        """
85
        Set the process for the executor.
86

87
        Args:
88
            process (List[RaisingThread]): The process for the executor.
89
        """
90
        self._process = process
1✔
91
        for process in self._process:
1✔
92
            process.start()
1✔
93

94

95
class InteractiveExecutor(ExecutorBroker):
1✔
96
    """
97
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
98
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
99
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
100
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
101
    improves the usability in particular when used in combination with Jupyter notebooks.
102

103
    Args:
104
        max_workers (int): defines the number workers which can execute functions in parallel
105
        executor_kwargs (dict): keyword arguments for the executor
106
        spawner (BaseSpawner): interface class to initiate python processes
107

108
    Examples:
109

110
        >>> import numpy as np
111
        >>> from executorlib.interactive.executor import InteractiveExecutor
112
        >>>
113
        >>> def calc(i, j, k):
114
        >>>     from mpi4py import MPI
115
        >>>     size = MPI.COMM_WORLD.Get_size()
116
        >>>     rank = MPI.COMM_WORLD.Get_rank()
117
        >>>     return np.array([i, j, k]), size, rank
118
        >>>
119
        >>> def init_k():
120
        >>>     return {"k": 3}
121
        >>>
122
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
123
        >>>     fs = p.submit(calc, 2, j=4)
124
        >>>     print(fs.result())
125
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
126

127
    """
128

129
    def __init__(
1✔
130
        self,
131
        max_workers: int = 1,
132
        executor_kwargs: dict = {},
133
        spawner: BaseSpawner = MpiExecSpawner,
134
    ):
135
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
136
        executor_kwargs["future_queue"] = self._future_queue
1✔
137
        executor_kwargs["spawner"] = spawner
1✔
138
        executor_kwargs["queue_join_on_shutdown"] = False  # The same queue is shared over multiple threads
1✔
139
        self._set_process(
1✔
140
            process=[
141
                RaisingThread(
142
                    target=execute_parallel_tasks,
143
                    kwargs=executor_kwargs,
144
                )
145
                for _ in range(max_workers)
146
            ],
147
        )
148

149

150
class InteractiveStepExecutor(ExecutorBase):
1✔
151
    """
152
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
153
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
154
    can be executed in a serial python process and does not require the python script to be executed with MPI.
155
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
156
    in combination with Jupyter notebooks.
157

158
    Args:
159
        max_cores (int): defines the number workers which can execute functions in parallel
160
        executor_kwargs (dict): keyword arguments for the executor
161
        spawner (BaseSpawner): interface class to initiate python processes
162

163
    Examples:
164

165
        >>> import numpy as np
166
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
167
        >>>
168
        >>> def calc(i, j, k):
169
        >>>     from mpi4py import MPI
170
        >>>     size = MPI.COMM_WORLD.Get_size()
171
        >>>     rank = MPI.COMM_WORLD.Get_rank()
172
        >>>     return np.array([i, j, k]), size, rank
173
        >>>
174
        >>> with PyFluxStepExecutor(max_cores=2) as p:
175
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
176
        >>>     print(fs.result())
177

178
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
179

180
    """
181

182
    def __init__(
1✔
183
        self,
184
        max_cores: Optional[int] = None,
185
        max_workers: Optional[int] = None,
186
        executor_kwargs: dict = {},
187
        spawner: BaseSpawner = MpiExecSpawner,
188
    ):
189
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
190
        executor_kwargs["future_queue"] = self._future_queue
1✔
191
        executor_kwargs["spawner"] = spawner
1✔
192
        executor_kwargs["max_cores"] = max_cores
1✔
193
        executor_kwargs["max_workers"] = max_workers
1✔
194
        self._set_process(
1✔
195
            RaisingThread(
196
                target=execute_separate_tasks,
197
                kwargs=executor_kwargs,
198
            )
199
        )
200

201

202
def execute_parallel_tasks(
1✔
203
    future_queue: queue.Queue,
204
    cores: int = 1,
205
    spawner: BaseSpawner = MpiExecSpawner,
206
    hostname_localhost: Optional[bool] = None,
207
    init_function: Optional[Callable] = None,
208
    cache_directory: Optional[str] = None,
209
    queue_join_on_shutdown: bool = True,
210
    **kwargs,
211
) -> None:
212
    """
213
    Execute a single tasks in parallel using the message passing interface (MPI).
214

215
    Args:
216
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
217
       cores (int): defines the total number of MPI ranks to use
218
       spawner (BaseSpawner): Spawner to start process on selected compute resources
219
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
220
                                     context of an HPC cluster this essential to be able to communicate to an
221
                                     Executor running on a different compute node within the same allocation. And
222
                                     in principle any computer should be able to resolve that their own hostname
223
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
224
                                     this look up for security reasons. So on MacOS it is required to set this
225
                                     option to true
226
       init_function (callable): optional function to preset arguments for functions which are submitted later
227
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
228
       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
229
    """
230
    interface = interface_bootup(
1✔
231
        command_lst=_get_backend_path(
232
            cores=cores,
233
        ),
234
        connections=spawner(cores=cores, **kwargs),
235
        hostname_localhost=hostname_localhost,
236
    )
237
    if init_function is not None:
1✔
238
        interface.send_dict(
1✔
239
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
240
        )
241
    while True:
1✔
242
        task_dict = future_queue.get()
1✔
243
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
244
            interface.shutdown(wait=task_dict["wait"])
1✔
245
            future_queue.task_done()
1✔
246
            if queue_join_on_shutdown:
1✔
247
                future_queue.join()
1✔
248
            break
1✔
249
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
250
            if cache_directory is None:
1✔
251
                _execute_task(
1✔
252
                    interface=interface, task_dict=task_dict, future_queue=future_queue
253
                )
254
            else:
255
                _execute_task_with_cache(
1✔
256
                    interface=interface,
257
                    task_dict=task_dict,
258
                    future_queue=future_queue,
259
                    cache_directory=cache_directory,
260
                )
261

262

263
def execute_separate_tasks(
1✔
264
    future_queue: queue.Queue,
265
    spawner: BaseSpawner = MpiExecSpawner,
266
    max_cores: Optional[int] = None,
267
    max_workers: Optional[int] = None,
268
    hostname_localhost: Optional[bool] = None,
269
    **kwargs,
270
):
271
    """
272
    Execute a single tasks in parallel using the message passing interface (MPI).
273

274
    Args:
275
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
276
       spawner (BaseSpawner): Interface to start process on selected compute resources
277
       max_cores (int): defines the number cores which can be used in parallel
278
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
279
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
280
                          recommended, as computers have a limited number of compute cores.
281
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
282
                                     context of an HPC cluster this essential to be able to communicate to an
283
                                     Executor running on a different compute node within the same allocation. And
284
                                     in principle any computer should be able to resolve that their own hostname
285
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
286
                                     this look up for security reasons. So on MacOS it is required to set this
287
                                     option to true
288
    """
289
    active_task_dict = {}
1✔
290
    process_lst, qtask_lst = [], []
1✔
291
    if "cores" not in kwargs.keys():
1✔
292
        kwargs["cores"] = 1
1✔
293
    while True:
1✔
294
        task_dict = future_queue.get()
1✔
295
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
296
            if task_dict["wait"]:
1✔
297
                _ = [process.join() for process in process_lst]
1✔
298
            future_queue.task_done()
1✔
299
            future_queue.join()
1✔
300
            break
1✔
301
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
302
            qtask = queue.Queue()
1✔
303
            process, active_task_dict = _submit_function_to_separate_process(
1✔
304
                task_dict=task_dict,
305
                qtask=qtask,
306
                active_task_dict=active_task_dict,
307
                spawner=spawner,
308
                executor_kwargs=kwargs,
309
                max_cores=max_cores,
310
                max_workers=max_workers,
311
                hostname_localhost=hostname_localhost,
312
            )
313
            qtask_lst.append(qtask)
1✔
314
            process_lst.append(process)
1✔
315
            future_queue.task_done()
1✔
316

317

318
def execute_tasks_with_dependencies(
1✔
319
    future_queue: queue.Queue,
320
    executor_queue: queue.Queue,
321
    executor: ExecutorBase,
322
    refresh_rate: float = 0.01,
323
):
324
    """
325
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
326
    other tasks.
327

328
    Args:
329
        future_queue (Queue): Queue for receiving new tasks.
330
        executor_queue (Queue): Queue for the internal executor.
331
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
332
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
333
    """
334
    wait_lst = []
1✔
335
    while True:
1✔
336
        try:
1✔
337
            task_dict = future_queue.get_nowait()
1✔
338
        except queue.Empty:
1✔
339
            task_dict = None
1✔
340
        if (  # shutdown the executor
1✔
341
            task_dict is not None
342
            and "shutdown" in task_dict.keys()
343
            and task_dict["shutdown"]
344
        ):
345
            executor.shutdown(wait=task_dict["wait"])
1✔
346
            future_queue.task_done()
1✔
347
            future_queue.join()
1✔
348
            break
1✔
349
        elif (  # handle function submitted to the executor
1✔
350
            task_dict is not None
351
            and "fn" in task_dict.keys()
352
            and "future" in task_dict.keys()
353
        ):
354
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
355
            if len(future_lst) == 0 or ready_flag:
1✔
356
                # No future objects are used in the input or all future objects are already done
357
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
358
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
359
                )
360
                executor_queue.put(task_dict)
1✔
361
            else:  # Otherwise add the function to the wait list
362
                task_dict["future_lst"] = future_lst
1✔
363
                wait_lst.append(task_dict)
1✔
364
            future_queue.task_done()
1✔
365
        elif len(wait_lst) > 0:
1✔
366
            number_waiting = len(wait_lst)
1✔
367
            # Check functions in the wait list and execute them if all future objects are now ready
368
            wait_lst = _submit_waiting_task(
1✔
369
                wait_lst=wait_lst, executor_queue=executor_queue
370
            )
371
            # if no job is ready, sleep for a moment
372
            if len(wait_lst) == number_waiting:
1✔
373
                sleep(refresh_rate)
1✔
374
        else:
375
            # If there is nothing else to do, sleep for a moment
376
            sleep(refresh_rate)
1✔
377

378

379
def _get_backend_path(
1✔
380
    cores: int,
381
) -> list:
382
    """
383
    Get command to call backend as a list of two strings
384

385
    Args:
386
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
387

388
    Returns:
389
        list[str]: List of strings containing the python executable path and the backend script to execute
390
    """
391
    command_lst = [sys.executable]
1✔
392
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
393
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
394
    elif cores > 1:
1✔
UNCOV
395
        raise ImportError(
×
396
            "mpi4py is required for parallel calculations. Please install mpi4py."
397
        )
398
    else:
399
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
400
    return command_lst
1✔
401

402

403
def _wait_for_free_slots(
1✔
404
    active_task_dict: dict,
405
    cores_requested: int,
406
    max_cores: Optional[int] = None,
407
    max_workers: Optional[int] = None,
408
) -> dict:
409
    """
410
    Wait for available computing resources to become available.
411

412
    Args:
413
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
414
        cores_requested (int): Number of cores required for executing the next task
415
        max_cores (int): Maximum number cores which can be used
416
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
417
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
418
                           recommended, as computers have a limited number of compute cores.
419

420
    Returns:
421
        dict: Dictionary containing the future objects and the number of cores they require
422
    """
423
    if max_cores is not None:
1✔
424
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
425
            active_task_dict = {
1✔
426
                k: v for k, v in active_task_dict.items() if not k.done()
427
            }
428
    elif max_workers is not None and max_cores is None:
1✔
429
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
430
            active_task_dict = {
1✔
431
                k: v for k, v in active_task_dict.items() if not k.done()
432
            }
433
    return active_task_dict
1✔
434

435

436
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
437
    """
438
    Submit the waiting tasks, which future inputs have been completed, to the executor
439

440
    Args:
441
        wait_lst (list): List of waiting tasks
442
        executor_queue (Queue): Queue of the internal executor
443

444
    Returns:
445
        list: list tasks which future inputs have not been completed
446
    """
447
    wait_tmp_lst = []
1✔
448
    for task_wait_dict in wait_lst:
1✔
449
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
450
            del task_wait_dict["future_lst"]
1✔
451
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
452
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
453
            )
454
            executor_queue.put(task_wait_dict)
1✔
455
        else:
456
            wait_tmp_lst.append(task_wait_dict)
1✔
457
    return wait_tmp_lst
1✔
458

459

460
def _update_futures_in_input(args: tuple, kwargs: dict):
1✔
461
    """
462
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
463

464
    Args:
465
        args (tuple): function arguments
466
        kwargs (dict): function keyword arguments
467

468
    Returns:
469
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
470
    """
471

472
    def get_result(arg):
1✔
473
        if isinstance(arg, Future):
1✔
474
            return arg.result()
1✔
475
        elif isinstance(arg, list):
1✔
476
            return [get_result(arg=el) for el in arg]
1✔
477
        else:
478
            return arg
1✔
479

480
    args = [get_result(arg=arg) for arg in args]
1✔
481
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
482
    return args, kwargs
1✔
483

484

485
def _get_future_objects_from_input(task_dict: dict):
1✔
486
    """
487
    Check the input parameters if they contain future objects and which of these future objects are executed
488

489
    Args:
490
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
491
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
492

493
    Returns:
494
        list, boolean: list of future objects and boolean flag if all future objects are already done
495
    """
496
    future_lst = []
1✔
497

498
    def find_future_in_list(lst):
1✔
499
        for el in lst:
1✔
500
            if isinstance(el, Future):
1✔
501
                future_lst.append(el)
1✔
502
            elif isinstance(el, list):
1✔
503
                find_future_in_list(lst=el)
1✔
504

505
    find_future_in_list(lst=task_dict["args"])
1✔
506
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
507
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
508
        future_lst
509
    )
510
    return future_lst, boolean_flag
1✔
511

512

513
def _submit_function_to_separate_process(
1✔
514
    task_dict: dict,
515
    active_task_dict: dict,
516
    qtask: queue.Queue,
517
    spawner: BaseSpawner,
518
    executor_kwargs: dict,
519
    max_cores: Optional[int] = None,
520
    max_workers: Optional[int] = None,
521
    hostname_localhost: Optional[bool] = None,
522
):
523
    """
524
    Submit function to be executed in separate Python process
525
    Args:
526
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
527
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
528
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
529
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
530
        spawner (BaseSpawner): Interface to start process on selected compute resources
531
        executor_kwargs (dict): keyword parameters used to initialize the Executor
532
        max_cores (int): defines the number cores which can be used in parallel
533
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
534
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
535
                           recommended, as computers have a limited number of compute cores.
536
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
537
                                     context of an HPC cluster this essential to be able to communicate to an
538
                                     Executor running on a different compute node within the same allocation. And
539
                                     in principle any computer should be able to resolve that their own hostname
540
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
541
                                     this look up for security reasons. So on MacOS it is required to set this
542
                                     option to true
543
    Returns:
544
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
545
                             dictionary containing the future objects and the number of cores they require
546
    """
547
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
548
    qtask.put(task_dict)
1✔
549
    qtask.put({"shutdown": True, "wait": True})
1✔
550
    if "cores" not in resource_dict.keys() or (
1✔
551
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
552
    ):
553
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
554
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
555
    active_task_dict = _wait_for_free_slots(
1✔
556
        active_task_dict=active_task_dict,
557
        cores_requested=slots_required,
558
        max_cores=max_cores,
559
        max_workers=max_workers,
560
    )
561
    active_task_dict[task_dict["future"]] = slots_required
1✔
562
    task_kwargs = executor_kwargs.copy()
1✔
563
    task_kwargs.update(resource_dict)
1✔
564
    task_kwargs.update(
1✔
565
        {
566
            "future_queue": qtask,
567
            "spawner": spawner,
568
            "hostname_localhost": hostname_localhost,
569
            "init_function": None,
570
        }
571
    )
572
    process = RaisingThread(
1✔
573
        target=execute_parallel_tasks,
574
        kwargs=task_kwargs,
575
    )
576
    process.start()
1✔
577
    return process, active_task_dict
1✔
578

579

580
def _execute_task(
1✔
581
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
582
):
583
    """
584
    Execute the task in the task_dict by communicating it via the interface.
585

586
    Args:
587
        interface (SocketInterface): socket interface for zmq communication
588
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
589
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
590
        future_queue (Queue): Queue for receiving new tasks.
591
    """
592
    f = task_dict.pop("future")
1✔
593
    if f.set_running_or_notify_cancel():
1✔
594
        try:
1✔
595
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
596
        except Exception as thread_exception:
1✔
597
            interface.shutdown(wait=True)
1✔
598
            future_queue.task_done()
1✔
599
            f.set_exception(exception=thread_exception)
1✔
600
            raise thread_exception
1✔
601
        else:
602
            future_queue.task_done()
1✔
603

604

605
def _execute_task_with_cache(
1✔
606
    interface: SocketInterface,
607
    task_dict: dict,
608
    future_queue: queue.Queue,
609
    cache_directory: str,
610
):
611
    """
612
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
613

614
    Args:
615
        interface (SocketInterface): socket interface for zmq communication
616
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
617
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
618
        future_queue (Queue): Queue for receiving new tasks.
619
        cache_directory (str): The directory to store cache files.
620
    """
621
    from executorlib.standalone.hdf import dump, get_output
1✔
622

623
    task_key, data_dict = serialize_funct_h5(
1✔
624
        fn=task_dict["fn"],
625
        fn_args=task_dict["args"],
626
        fn_kwargs=task_dict["kwargs"],
627
        resource_dict=task_dict.get("resource_dict", {}),
628
    )
629
    os.makedirs(cache_directory, exist_ok=True)
1✔
630
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
631
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
632
        f = task_dict.pop("future")
1✔
633
        if f.set_running_or_notify_cancel():
1✔
634
            try:
1✔
635
                time_start = time.time()
1✔
636
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
637
                data_dict["output"] = result
1✔
638
                data_dict["runtime"] = time.time() - time_start
1✔
639
                dump(file_name=file_name, data_dict=data_dict)
1✔
640
                f.set_result(result)
1✔
641
            except Exception as thread_exception:
1✔
642
                interface.shutdown(wait=True)
1✔
643
                future_queue.task_done()
1✔
644
                f.set_exception(exception=thread_exception)
1✔
645
                raise thread_exception
1✔
646
            else:
647
                future_queue.task_done()
1✔
648
    else:
649
        _, result = get_output(file_name=file_name)
1✔
650
        future = task_dict["future"]
1✔
651
        future.set_result(result)
1✔
652
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc