• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13116578816

03 Feb 2025 03:11PM UTC coverage: 96.536%. Remained the same
13116578816

Pull #555

github

web-flow
Merge 70bacefb9 into 5ec2a2015
Pull Request #555: Add linting

14 of 15 new or added lines in 7 files covered. (93.33%)

17 existing lines in 7 files now uncovered.

1059 of 1097 relevant lines covered (96.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.53
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Any, Callable, List, Optional, Tuple, Union
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future:  # type: ignore
1✔
27
        """
28
        Submits a callable to be executed with the given arguments.
29

30
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
31
        a Future instance representing the execution of the callable.
32

33
        Args:
34
            fn (Callable): function to submit for execution
35
            args: arguments for the submitted function
36
            kwargs: keyword arguments for the submitted function
37
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
38
                                  function. Example resource dictionary: {
39
                                      cores: 1,
40
                                      threads_per_core: 1,
41
                                      gpus_per_worker: 0,
42
                                      oversubscribe: False,
43
                                      cwd: None,
44
                                      executor: None,
45
                                      hostname_localhost: False,
46
                                  }
47

48
        Returns:
49
            Future: A Future representing the given call.
50
        """
51
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
52
        check_resource_dict(function=fn)
1✔
53
        f: Future = Future()
1✔
54
        if self._future_queue is not None:
1✔
55
            self._future_queue.put(
1✔
56
                {"fn": fn, "args": args, "kwargs": kwargs, "future": f}
57
            )
58
        return f
1✔
59

60
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
61
        """Clean-up the resources associated with the Executor.
62

63
        It is safe to call this method several times. Otherwise, no other
64
        methods can be called after this one.
65

66
        Args:
67
            wait: If True then shutdown will not return until all running
68
                futures have finished executing and the resources used by the
69
                parallel_executors have been reclaimed.
70
            cancel_futures: If True then shutdown will cancel all pending
71
                futures. Futures that are completed or running will not be
72
                cancelled.
73
        """
74
        if self._future_queue is not None:
1✔
75
            if cancel_futures:
1✔
76
                cancel_items_in_queue(que=self._future_queue)
1✔
77
            if isinstance(self._process, list):
1✔
78
                for _ in range(len(self._process)):
1✔
79
                    self._future_queue.put({"shutdown": True, "wait": wait})
1✔
80
                if wait:
1✔
81
                    for process in self._process:
1✔
82
                        process.join()
1✔
83
                    self._future_queue.join()
1✔
84
        self._process = None
1✔
85
        self._future_queue = None
1✔
86

87
    def _set_process(self, process: List[RaisingThread]):  # type: ignore
1✔
88
        """
89
        Set the process for the executor.
90

91
        Args:
92
            process (List[RaisingThread]): The process for the executor.
93
        """
94
        self._process = process
1✔
95
        for process_instance in self._process:
1✔
96
            process_instance.start()
1✔
97

98

99
class InteractiveExecutor(ExecutorBroker):
1✔
100
    """
101
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
102
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
103
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
104
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
105
    improves the usability in particular when used in combination with Jupyter notebooks.
106

107
    Args:
108
        max_workers (int): defines the number workers which can execute functions in parallel
109
        executor_kwargs (dict): keyword arguments for the executor
110
        spawner (BaseSpawner): interface class to initiate python processes
111

112
    Examples:
113

114
        >>> import numpy as np
115
        >>> from executorlib.interactive.executor import InteractiveExecutor
116
        >>>
117
        >>> def calc(i, j, k):
118
        >>>     from mpi4py import MPI
119
        >>>     size = MPI.COMM_WORLD.Get_size()
120
        >>>     rank = MPI.COMM_WORLD.Get_rank()
121
        >>>     return np.array([i, j, k]), size, rank
122
        >>>
123
        >>> def init_k():
124
        >>>     return {"k": 3}
125
        >>>
126
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
127
        >>>     fs = p.submit(calc, 2, j=4)
128
        >>>     print(fs.result())
129
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
130

131
    """
132

133
    def __init__(
1✔
134
        self,
135
        max_workers: int = 1,
136
        executor_kwargs: dict = {},
137
        spawner: type[BaseSpawner] = MpiExecSpawner,
138
    ):
139
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
140
        executor_kwargs["future_queue"] = self._future_queue
1✔
141
        executor_kwargs["spawner"] = spawner
1✔
142
        executor_kwargs["queue_join_on_shutdown"] = False
1✔
143
        self._set_process(
1✔
144
            process=[
145
                RaisingThread(
146
                    target=execute_parallel_tasks,
147
                    kwargs=executor_kwargs,
148
                )
149
                for _ in range(max_workers)
150
            ],
151
        )
152

153

154
class InteractiveStepExecutor(ExecutorBase):
1✔
155
    """
156
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
157
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
158
    can be executed in a serial python process and does not require the python script to be executed with MPI.
159
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
160
    in combination with Jupyter notebooks.
161

162
    Args:
163
        max_cores (int): defines the number workers which can execute functions in parallel
164
        executor_kwargs (dict): keyword arguments for the executor
165
        spawner (BaseSpawner): interface class to initiate python processes
166

167
    Examples:
168

169
        >>> import numpy as np
170
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
171
        >>>
172
        >>> def calc(i, j, k):
173
        >>>     from mpi4py import MPI
174
        >>>     size = MPI.COMM_WORLD.Get_size()
175
        >>>     rank = MPI.COMM_WORLD.Get_rank()
176
        >>>     return np.array([i, j, k]), size, rank
177
        >>>
178
        >>> with PyFluxStepExecutor(max_cores=2) as p:
179
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
180
        >>>     print(fs.result())
181

182
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
183

184
    """
185

186
    def __init__(
1✔
187
        self,
188
        max_cores: Optional[int] = None,
189
        max_workers: Optional[int] = None,
190
        executor_kwargs: dict = {},
191
        spawner: type[BaseSpawner] = MpiExecSpawner,
192
    ):
193
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
194
        executor_kwargs["future_queue"] = self._future_queue
1✔
195
        executor_kwargs["spawner"] = spawner
1✔
196
        executor_kwargs["max_cores"] = max_cores
1✔
197
        executor_kwargs["max_workers"] = max_workers
1✔
198
        self._set_process(
1✔
199
            RaisingThread(
200
                target=execute_separate_tasks,
201
                kwargs=executor_kwargs,
202
            )
203
        )
204

205

206
def execute_parallel_tasks(
1✔
207
    future_queue: queue.Queue,
208
    cores: int = 1,
209
    spawner: type[BaseSpawner] = MpiExecSpawner,
210
    hostname_localhost: Optional[bool] = None,
211
    init_function: Optional[Callable] = None,
212
    cache_directory: Optional[str] = None,
213
    queue_join_on_shutdown: bool = True,
214
    **kwargs,
215
) -> None:
216
    """
217
    Execute a single tasks in parallel using the message passing interface (MPI).
218

219
    Args:
220
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
221
       cores (int): defines the total number of MPI ranks to use
222
       spawner (BaseSpawner): Spawner to start process on selected compute resources
223
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
224
                                     context of an HPC cluster this essential to be able to communicate to an
225
                                     Executor running on a different compute node within the same allocation. And
226
                                     in principle any computer should be able to resolve that their own hostname
227
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
228
                                     this look up for security reasons. So on MacOS it is required to set this
229
                                     option to true
230
       init_function (Callable): optional function to preset arguments for functions which are submitted later
231
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
232
       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
233
    """
234
    interface = interface_bootup(
1✔
235
        command_lst=_get_backend_path(
236
            cores=cores,
237
        ),
238
        connections=spawner(cores=cores, **kwargs),
239
        hostname_localhost=hostname_localhost,
240
    )
241
    if init_function is not None:
1✔
242
        interface.send_dict(
1✔
243
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
244
        )
245
    while True:
1✔
246
        task_dict = future_queue.get()
1✔
247
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
248
            interface.shutdown(wait=task_dict["wait"])
1✔
249
            future_queue.task_done()
1✔
250
            if queue_join_on_shutdown:
1✔
251
                future_queue.join()
1✔
252
            break
1✔
253
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
254
            if cache_directory is None:
1✔
255
                _execute_task(
1✔
256
                    interface=interface, task_dict=task_dict, future_queue=future_queue
257
                )
258
            else:
259
                _execute_task_with_cache(
1✔
260
                    interface=interface,
261
                    task_dict=task_dict,
262
                    future_queue=future_queue,
263
                    cache_directory=cache_directory,
264
                )
265

266

267
def execute_separate_tasks(
1✔
268
    future_queue: queue.Queue,
269
    spawner: type[BaseSpawner] = MpiExecSpawner,
270
    max_cores: Optional[int] = None,
271
    max_workers: Optional[int] = None,
272
    hostname_localhost: Optional[bool] = None,
273
    **kwargs,
274
):
275
    """
276
    Execute a single tasks in parallel using the message passing interface (MPI).
277

278
    Args:
279
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
280
       spawner (BaseSpawner): Interface to start process on selected compute resources
281
       max_cores (int): defines the number cores which can be used in parallel
282
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
283
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
284
                          recommended, as computers have a limited number of compute cores.
285
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
286
                                     context of an HPC cluster this essential to be able to communicate to an
287
                                     Executor running on a different compute node within the same allocation. And
288
                                     in principle any computer should be able to resolve that their own hostname
289
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
290
                                     this look up for security reasons. So on MacOS it is required to set this
291
                                     option to true
292
    """
293
    active_task_dict: dict = {}
1✔
294
    process_lst: list = []
1✔
295
    qtask_lst: list = []
1✔
296
    if "cores" not in kwargs:
1✔
297
        kwargs["cores"] = 1
1✔
298
    while True:
1✔
299
        task_dict = future_queue.get()
1✔
300
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
301
            if task_dict["wait"]:
1✔
302
                _ = [process.join() for process in process_lst]
1✔
303
            future_queue.task_done()
1✔
304
            future_queue.join()
1✔
305
            break
1✔
306
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
307
            qtask: queue.Queue = queue.Queue()
1✔
308
            process, active_task_dict = _submit_function_to_separate_process(
1✔
309
                task_dict=task_dict,
310
                qtask=qtask,
311
                active_task_dict=active_task_dict,
312
                spawner=spawner,
313
                executor_kwargs=kwargs,
314
                max_cores=max_cores,
315
                max_workers=max_workers,
316
                hostname_localhost=hostname_localhost,
317
            )
318
            qtask_lst.append(qtask)
1✔
319
            process_lst.append(process)
1✔
320
            future_queue.task_done()
1✔
321

322

323
def execute_tasks_with_dependencies(
1✔
324
    future_queue: queue.Queue,
325
    executor_queue: queue.Queue,
326
    executor: ExecutorBase,
327
    refresh_rate: float = 0.01,
328
):
329
    """
330
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
331
    other tasks.
332

333
    Args:
334
        future_queue (Queue): Queue for receiving new tasks.
335
        executor_queue (Queue): Queue for the internal executor.
336
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
337
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
338
    """
339
    wait_lst = []
1✔
340
    while True:
1✔
341
        try:
1✔
342
            task_dict = future_queue.get_nowait()
1✔
343
        except queue.Empty:
1✔
344
            task_dict = None
1✔
345
        if (  # shutdown the executor
1✔
346
            task_dict is not None
347
            and "shutdown" in task_dict.keys()
348
            and task_dict["shutdown"]
349
        ):
350
            executor.shutdown(wait=task_dict["wait"])
1✔
351
            future_queue.task_done()
1✔
352
            future_queue.join()
1✔
353
            break
1✔
354
        elif (  # handle function submitted to the executor
1✔
355
            task_dict is not None
356
            and "fn" in task_dict.keys()
357
            and "future" in task_dict.keys()
358
        ):
359
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
360
            if len(future_lst) == 0 or ready_flag:
1✔
361
                # No future objects are used in the input or all future objects are already done
362
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
363
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
364
                )
365
                executor_queue.put(task_dict)
1✔
366
            else:  # Otherwise add the function to the wait list
367
                task_dict["future_lst"] = future_lst
1✔
368
                wait_lst.append(task_dict)
1✔
369
            future_queue.task_done()
1✔
370
        elif len(wait_lst) > 0:
1✔
371
            number_waiting = len(wait_lst)
1✔
372
            # Check functions in the wait list and execute them if all future objects are now ready
373
            wait_lst = _submit_waiting_task(
1✔
374
                wait_lst=wait_lst, executor_queue=executor_queue
375
            )
376
            # if no job is ready, sleep for a moment
377
            if len(wait_lst) == number_waiting:
1✔
378
                sleep(refresh_rate)
1✔
379
        else:
380
            # If there is nothing else to do, sleep for a moment
381
            sleep(refresh_rate)
1✔
382

383

384
def _get_backend_path(
1✔
385
    cores: int,
386
) -> list:
387
    """
388
    Get command to call backend as a list of two strings
389

390
    Args:
391
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
392

393
    Returns:
394
        list[str]: List of strings containing the python executable path and the backend script to execute
395
    """
396
    command_lst = [sys.executable]
1✔
397
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
398
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
399
    elif cores > 1:
1✔
UNCOV
400
        raise ImportError(
×
401
            "mpi4py is required for parallel calculations. Please install mpi4py."
402
        )
403
    else:
404
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
405
    return command_lst
1✔
406

407

408
def _wait_for_free_slots(
1✔
409
    active_task_dict: dict,
410
    cores_requested: int,
411
    max_cores: Optional[int] = None,
412
    max_workers: Optional[int] = None,
413
) -> dict:
414
    """
415
    Wait for available computing resources to become available.
416

417
    Args:
418
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
419
        cores_requested (int): Number of cores required for executing the next task
420
        max_cores (int): Maximum number cores which can be used
421
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
422
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
423
                           recommended, as computers have a limited number of compute cores.
424

425
    Returns:
426
        dict: Dictionary containing the future objects and the number of cores they require
427
    """
428
    if max_cores is not None:
1✔
429
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
430
            active_task_dict = {
1✔
431
                k: v for k, v in active_task_dict.items() if not k.done()
432
            }
433
    elif max_workers is not None and max_cores is None:
1✔
434
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
435
            active_task_dict = {
1✔
436
                k: v for k, v in active_task_dict.items() if not k.done()
437
            }
438
    return active_task_dict
1✔
439

440

441
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
442
    """
443
    Submit the waiting tasks, which future inputs have been completed, to the executor
444

445
    Args:
446
        wait_lst (list): List of waiting tasks
447
        executor_queue (Queue): Queue of the internal executor
448

449
    Returns:
450
        list: list tasks which future inputs have not been completed
451
    """
452
    wait_tmp_lst = []
1✔
453
    for task_wait_dict in wait_lst:
1✔
454
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
455
            del task_wait_dict["future_lst"]
1✔
456
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
457
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
458
            )
459
            executor_queue.put(task_wait_dict)
1✔
460
        else:
461
            wait_tmp_lst.append(task_wait_dict)
1✔
462
    return wait_tmp_lst
1✔
463

464

465
def _update_futures_in_input(args: tuple, kwargs: dict) -> Tuple[tuple, dict]:
1✔
466
    """
467
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
468

469
    Args:
470
        args (tuple): function arguments
471
        kwargs (dict): function keyword arguments
472

473
    Returns:
474
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
475
    """
476

477
    def get_result(arg: Union[List[Future], Future]) -> Any:
1✔
478
        if isinstance(arg, Future):
1✔
479
            return arg.result()
1✔
480
        elif isinstance(arg, list):
1✔
481
            return [get_result(arg=el) for el in arg]
1✔
482
        else:
483
            return arg
1✔
484

485
    args = tuple([get_result(arg=arg) for arg in args])
1✔
486
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
487
    return args, kwargs
1✔
488

489

490
def _get_future_objects_from_input(task_dict: dict):
1✔
491
    """
492
    Check the input parameters if they contain future objects and which of these future objects are executed
493

494
    Args:
495
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
496
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
497

498
    Returns:
499
        list, boolean: list of future objects and boolean flag if all future objects are already done
500
    """
501
    future_lst = []
1✔
502

503
    def find_future_in_list(lst):
1✔
504
        for el in lst:
1✔
505
            if isinstance(el, Future):
1✔
506
                future_lst.append(el)
1✔
507
            elif isinstance(el, list):
1✔
508
                find_future_in_list(lst=el)
1✔
509

510
    find_future_in_list(lst=task_dict["args"])
1✔
511
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
512
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
513
        future_lst
514
    )
515
    return future_lst, boolean_flag
1✔
516

517

518
def _submit_function_to_separate_process(
1✔
519
    task_dict: dict,
520
    active_task_dict: dict,
521
    qtask: queue.Queue,
522
    spawner: type[BaseSpawner],
523
    executor_kwargs: dict,
524
    max_cores: Optional[int] = None,
525
    max_workers: Optional[int] = None,
526
    hostname_localhost: Optional[bool] = None,
527
):
528
    """
529
    Submit function to be executed in separate Python process
530
    Args:
531
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
532
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
533
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
534
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
535
        spawner (BaseSpawner): Interface to start process on selected compute resources
536
        executor_kwargs (dict): keyword parameters used to initialize the Executor
537
        max_cores (int): defines the number cores which can be used in parallel
538
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
539
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
540
                           recommended, as computers have a limited number of compute cores.
541
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
542
                                     context of an HPC cluster this essential to be able to communicate to an
543
                                     Executor running on a different compute node within the same allocation. And
544
                                     in principle any computer should be able to resolve that their own hostname
545
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
546
                                     this look up for security reasons. So on MacOS it is required to set this
547
                                     option to true
548
    Returns:
549
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
550
                             dictionary containing the future objects and the number of cores they require
551
    """
552
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
553
    qtask.put(task_dict)
1✔
554
    qtask.put({"shutdown": True, "wait": True})
1✔
555
    if "cores" not in resource_dict.keys() or (
1✔
556
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
557
    ):
558
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
559
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
560
    active_task_dict = _wait_for_free_slots(
1✔
561
        active_task_dict=active_task_dict,
562
        cores_requested=slots_required,
563
        max_cores=max_cores,
564
        max_workers=max_workers,
565
    )
566
    active_task_dict[task_dict["future"]] = slots_required
1✔
567
    task_kwargs = executor_kwargs.copy()
1✔
568
    task_kwargs.update(resource_dict)
1✔
569
    task_kwargs.update(
1✔
570
        {
571
            "future_queue": qtask,
572
            "spawner": spawner,
573
            "hostname_localhost": hostname_localhost,
574
            "init_function": None,
575
        }
576
    )
577
    process = RaisingThread(
1✔
578
        target=execute_parallel_tasks,
579
        kwargs=task_kwargs,
580
    )
581
    process.start()
1✔
582
    return process, active_task_dict
1✔
583

584

585
def _execute_task(
1✔
586
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
587
):
588
    """
589
    Execute the task in the task_dict by communicating it via the interface.
590

591
    Args:
592
        interface (SocketInterface): socket interface for zmq communication
593
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
594
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
595
        future_queue (Queue): Queue for receiving new tasks.
596
    """
597
    f = task_dict.pop("future")
1✔
598
    if f.set_running_or_notify_cancel():
1✔
599
        try:
1✔
600
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
601
        except Exception as thread_exception:
1✔
602
            interface.shutdown(wait=True)
1✔
603
            future_queue.task_done()
1✔
604
            f.set_exception(exception=thread_exception)
1✔
605
            raise thread_exception
1✔
606
        else:
607
            future_queue.task_done()
1✔
608

609

610
def _execute_task_with_cache(
1✔
611
    interface: SocketInterface,
612
    task_dict: dict,
613
    future_queue: queue.Queue,
614
    cache_directory: str,
615
):
616
    """
617
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
618

619
    Args:
620
        interface (SocketInterface): socket interface for zmq communication
621
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
622
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
623
        future_queue (Queue): Queue for receiving new tasks.
624
        cache_directory (str): The directory to store cache files.
625
    """
626
    from executorlib.standalone.hdf import dump, get_output
1✔
627

628
    task_key, data_dict = serialize_funct_h5(
1✔
629
        fn=task_dict["fn"],
630
        fn_args=task_dict["args"],
631
        fn_kwargs=task_dict["kwargs"],
632
        resource_dict=task_dict.get("resource_dict", {}),
633
    )
634
    os.makedirs(cache_directory, exist_ok=True)
1✔
635
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
636
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
637
        f = task_dict.pop("future")
1✔
638
        if f.set_running_or_notify_cancel():
1✔
639
            try:
1✔
640
                time_start = time.time()
1✔
641
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
642
                data_dict["output"] = result
1✔
643
                data_dict["runtime"] = time.time() - time_start
1✔
644
                dump(file_name=file_name, data_dict=data_dict)
1✔
645
                f.set_result(result)
1✔
646
            except Exception as thread_exception:
1✔
647
                interface.shutdown(wait=True)
1✔
648
                future_queue.task_done()
1✔
649
                f.set_exception(exception=thread_exception)
1✔
650
                raise thread_exception
1✔
651
            else:
652
                future_queue.task_done()
1✔
653
    else:
654
        _, result = get_output(file_name=file_name)
1✔
655
        future = task_dict["future"]
1✔
656
        future.set_result(result)
1✔
657
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc