• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13118649262

03 Feb 2025 04:56PM UTC coverage: 95.996% (-0.5%) from 96.536%
13118649262

Pull #555

github

web-flow
Merge 2f06a3fcd into 5ec2a2015
Pull Request #555: Add linting

76 of 83 new or added lines in 16 files covered. (91.57%)

4 existing lines in 2 files now uncovered.

1079 of 1124 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.64
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Any, Callable, Optional, Union
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs) -> Future:  # type: ignore
1✔
27
        """
28
        Submits a callable to be executed with the given arguments.
29

30
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
31
        a Future instance representing the execution of the callable.
32

33
        Args:
34
            fn (Callable): function to submit for execution
35
            args: arguments for the submitted function
36
            kwargs: keyword arguments for the submitted function
37
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
38
                                  function. Example resource dictionary: {
39
                                      cores: 1,
40
                                      threads_per_core: 1,
41
                                      gpus_per_worker: 0,
42
                                      oversubscribe: False,
43
                                      cwd: None,
44
                                      executor: None,
45
                                      hostname_localhost: False,
46
                                  }
47

48
        Returns:
49
            Future: A Future representing the given call.
50
        """
51
        if resource_dict is None:
1✔
52
            resource_dict = {}
1✔
53
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
54
        check_resource_dict(function=fn)
1✔
55
        f: Future = Future()
1✔
56
        if self._future_queue is not None:
1✔
57
            self._future_queue.put(
1✔
58
                {"fn": fn, "args": args, "kwargs": kwargs, "future": f}
59
            )
60
        return f
1✔
61

62
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
63
        """Clean-up the resources associated with the Executor.
64

65
        It is safe to call this method several times. Otherwise, no other
66
        methods can be called after this one.
67

68
        Args:
69
            wait: If True then shutdown will not return until all running
70
                futures have finished executing and the resources used by the
71
                parallel_executors have been reclaimed.
72
            cancel_futures: If True then shutdown will cancel all pending
73
                futures. Futures that are completed or running will not be
74
                cancelled.
75
        """
76
        if self._future_queue is not None:
1✔
77
            if cancel_futures:
1✔
78
                cancel_items_in_queue(que=self._future_queue)
1✔
79
            if isinstance(self._process, list):
1✔
80
                for _ in range(len(self._process)):
1✔
81
                    self._future_queue.put({"shutdown": True, "wait": wait})
1✔
82
                if wait:
1✔
83
                    for process in self._process:
1✔
84
                        process.join()
1✔
85
                    self._future_queue.join()
1✔
86
        self._process = None
1✔
87
        self._future_queue = None
1✔
88

89
    def _set_process(self, process: list[RaisingThread]):  # type: ignore
1✔
90
        """
91
        Set the process for the executor.
92

93
        Args:
94
            process (List[RaisingThread]): The process for the executor.
95
        """
96
        self._process = process
1✔
97
        for process_instance in self._process:
1✔
98
            process_instance.start()
1✔
99

100

101
class InteractiveExecutor(ExecutorBroker):
1✔
102
    """
103
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
104
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
105
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
106
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
107
    improves the usability in particular when used in combination with Jupyter notebooks.
108

109
    Args:
110
        max_workers (int): defines the number workers which can execute functions in parallel
111
        executor_kwargs (dict): keyword arguments for the executor
112
        spawner (BaseSpawner): interface class to initiate python processes
113

114
    Examples:
115

116
        >>> import numpy as np
117
        >>> from executorlib.interactive.executor import InteractiveExecutor
118
        >>>
119
        >>> def calc(i, j, k):
120
        >>>     from mpi4py import MPI
121
        >>>     size = MPI.COMM_WORLD.Get_size()
122
        >>>     rank = MPI.COMM_WORLD.Get_rank()
123
        >>>     return np.array([i, j, k]), size, rank
124
        >>>
125
        >>> def init_k():
126
        >>>     return {"k": 3}
127
        >>>
128
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
129
        >>>     fs = p.submit(calc, 2, j=4)
130
        >>>     print(fs.result())
131
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
132

133
    """
134

135
    def __init__(
1✔
136
        self,
137
        max_workers: int = 1,
138
        executor_kwargs: Optional[dict] = None,
139
        spawner: type[BaseSpawner] = MpiExecSpawner,
140
    ):
141
        if executor_kwargs is None:
1✔
UNCOV
142
            executor_kwargs = {}
×
143
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
144
        executor_kwargs["future_queue"] = self._future_queue
1✔
145
        executor_kwargs["spawner"] = spawner
1✔
146
        executor_kwargs["queue_join_on_shutdown"] = False
1✔
147
        self._set_process(
1✔
148
            process=[
149
                RaisingThread(
150
                    target=execute_parallel_tasks,
151
                    kwargs=executor_kwargs,
152
                )
153
                for _ in range(max_workers)
154
            ],
155
        )
156

157

158
class InteractiveStepExecutor(ExecutorBase):
1✔
159
    """
160
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
161
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
162
    can be executed in a serial python process and does not require the python script to be executed with MPI.
163
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
164
    in combination with Jupyter notebooks.
165

166
    Args:
167
        max_cores (int): defines the number workers which can execute functions in parallel
168
        executor_kwargs (dict): keyword arguments for the executor
169
        spawner (BaseSpawner): interface class to initiate python processes
170

171
    Examples:
172

173
        >>> import numpy as np
174
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
175
        >>>
176
        >>> def calc(i, j, k):
177
        >>>     from mpi4py import MPI
178
        >>>     size = MPI.COMM_WORLD.Get_size()
179
        >>>     rank = MPI.COMM_WORLD.Get_rank()
180
        >>>     return np.array([i, j, k]), size, rank
181
        >>>
182
        >>> with PyFluxStepExecutor(max_cores=2) as p:
183
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
184
        >>>     print(fs.result())
185

186
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
187

188
    """
189

190
    def __init__(
1✔
191
        self,
192
        max_cores: Optional[int] = None,
193
        max_workers: Optional[int] = None,
194
        executor_kwargs: Optional[dict] = None,
195
        spawner: type[BaseSpawner] = MpiExecSpawner,
196
    ):
197
        if executor_kwargs is None:
1✔
UNCOV
198
            executor_kwargs = {}
×
199
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
200
        executor_kwargs["future_queue"] = self._future_queue
1✔
201
        executor_kwargs["spawner"] = spawner
1✔
202
        executor_kwargs["max_cores"] = max_cores
1✔
203
        executor_kwargs["max_workers"] = max_workers
1✔
204
        self._set_process(
1✔
205
            RaisingThread(
206
                target=execute_separate_tasks,
207
                kwargs=executor_kwargs,
208
            )
209
        )
210

211

212
def execute_parallel_tasks(
1✔
213
    future_queue: queue.Queue,
214
    cores: int = 1,
215
    spawner: type[BaseSpawner] = MpiExecSpawner,
216
    hostname_localhost: Optional[bool] = None,
217
    init_function: Optional[Callable] = None,
218
    cache_directory: Optional[str] = None,
219
    queue_join_on_shutdown: bool = True,
220
    **kwargs,
221
) -> None:
222
    """
223
    Execute a single tasks in parallel using the message passing interface (MPI).
224

225
    Args:
226
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
227
       cores (int): defines the total number of MPI ranks to use
228
       spawner (BaseSpawner): Spawner to start process on selected compute resources
229
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
230
                                     context of an HPC cluster this essential to be able to communicate to an
231
                                     Executor running on a different compute node within the same allocation. And
232
                                     in principle any computer should be able to resolve that their own hostname
233
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
234
                                     this look up for security reasons. So on MacOS it is required to set this
235
                                     option to true
236
       init_function (Callable): optional function to preset arguments for functions which are submitted later
237
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
238
       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
239
    """
240
    interface = interface_bootup(
1✔
241
        command_lst=_get_backend_path(
242
            cores=cores,
243
        ),
244
        connections=spawner(cores=cores, **kwargs),
245
        hostname_localhost=hostname_localhost,
246
    )
247
    if init_function is not None:
1✔
248
        interface.send_dict(
1✔
249
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
250
        )
251
    while True:
1✔
252
        task_dict = future_queue.get()
1✔
253
        if "shutdown" in task_dict and task_dict["shutdown"]:
1✔
254
            interface.shutdown(wait=task_dict["wait"])
1✔
255
            future_queue.task_done()
1✔
256
            if queue_join_on_shutdown:
1✔
257
                future_queue.join()
1✔
258
            break
1✔
259
        elif "fn" in task_dict and "future" in task_dict:
1✔
260
            if cache_directory is None:
1✔
261
                _execute_task(
1✔
262
                    interface=interface, task_dict=task_dict, future_queue=future_queue
263
                )
264
            else:
265
                _execute_task_with_cache(
1✔
266
                    interface=interface,
267
                    task_dict=task_dict,
268
                    future_queue=future_queue,
269
                    cache_directory=cache_directory,
270
                )
271

272

273
def execute_separate_tasks(
1✔
274
    future_queue: queue.Queue,
275
    spawner: type[BaseSpawner] = MpiExecSpawner,
276
    max_cores: Optional[int] = None,
277
    max_workers: Optional[int] = None,
278
    hostname_localhost: Optional[bool] = None,
279
    **kwargs,
280
):
281
    """
282
    Execute a single tasks in parallel using the message passing interface (MPI).
283

284
    Args:
285
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
286
       spawner (BaseSpawner): Interface to start process on selected compute resources
287
       max_cores (int): defines the number cores which can be used in parallel
288
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
289
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
290
                          recommended, as computers have a limited number of compute cores.
291
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
292
                                     context of an HPC cluster this essential to be able to communicate to an
293
                                     Executor running on a different compute node within the same allocation. And
294
                                     in principle any computer should be able to resolve that their own hostname
295
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
296
                                     this look up for security reasons. So on MacOS it is required to set this
297
                                     option to true
298
    """
299
    active_task_dict: dict = {}
1✔
300
    process_lst: list = []
1✔
301
    qtask_lst: list = []
1✔
302
    if "cores" not in kwargs:
1✔
303
        kwargs["cores"] = 1
1✔
304
    while True:
1✔
305
        task_dict = future_queue.get()
1✔
306
        if "shutdown" in task_dict and task_dict["shutdown"]:
1✔
307
            if task_dict["wait"]:
1✔
308
                _ = [process.join() for process in process_lst]
1✔
309
            future_queue.task_done()
1✔
310
            future_queue.join()
1✔
311
            break
1✔
312
        elif "fn" in task_dict and "future" in task_dict:
1✔
313
            qtask: queue.Queue = queue.Queue()
1✔
314
            process, active_task_dict = _submit_function_to_separate_process(
1✔
315
                task_dict=task_dict,
316
                qtask=qtask,
317
                active_task_dict=active_task_dict,
318
                spawner=spawner,
319
                executor_kwargs=kwargs,
320
                max_cores=max_cores,
321
                max_workers=max_workers,
322
                hostname_localhost=hostname_localhost,
323
            )
324
            qtask_lst.append(qtask)
1✔
325
            process_lst.append(process)
1✔
326
            future_queue.task_done()
1✔
327

328

329
def execute_tasks_with_dependencies(
1✔
330
    future_queue: queue.Queue,
331
    executor_queue: queue.Queue,
332
    executor: ExecutorBase,
333
    refresh_rate: float = 0.01,
334
):
335
    """
336
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
337
    other tasks.
338

339
    Args:
340
        future_queue (Queue): Queue for receiving new tasks.
341
        executor_queue (Queue): Queue for the internal executor.
342
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
343
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
344
    """
345
    wait_lst = []
1✔
346
    while True:
1✔
347
        try:
1✔
348
            task_dict = future_queue.get_nowait()
1✔
349
        except queue.Empty:
1✔
350
            task_dict = None
1✔
351
        if (  # shutdown the executor
1✔
352
            task_dict is not None
353
            and "shutdown" in task_dict
354
            and task_dict["shutdown"]
355
        ):
356
            executor.shutdown(wait=task_dict["wait"])
1✔
357
            future_queue.task_done()
1✔
358
            future_queue.join()
1✔
359
            break
1✔
360
        elif (  # handle function submitted to the executor
1✔
361
            task_dict is not None
362
            and "fn" in task_dict
363
            and "future" in task_dict
364
        ):
365
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
366
            if len(future_lst) == 0 or ready_flag:
1✔
367
                # No future objects are used in the input or all future objects are already done
368
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
369
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
370
                )
371
                executor_queue.put(task_dict)
1✔
372
            else:  # Otherwise add the function to the wait list
373
                task_dict["future_lst"] = future_lst
1✔
374
                wait_lst.append(task_dict)
1✔
375
            future_queue.task_done()
1✔
376
        elif len(wait_lst) > 0:
1✔
377
            number_waiting = len(wait_lst)
1✔
378
            # Check functions in the wait list and execute them if all future objects are now ready
379
            wait_lst = _submit_waiting_task(
1✔
380
                wait_lst=wait_lst, executor_queue=executor_queue
381
            )
382
            # if no job is ready, sleep for a moment
383
            if len(wait_lst) == number_waiting:
1✔
384
                sleep(refresh_rate)
1✔
385
        else:
386
            # If there is nothing else to do, sleep for a moment
387
            sleep(refresh_rate)
1✔
388

389

390
def _get_backend_path(
1✔
391
    cores: int,
392
) -> list:
393
    """
394
    Get command to call backend as a list of two strings
395

396
    Args:
397
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
398

399
    Returns:
400
        list[str]: List of strings containing the python executable path and the backend script to execute
401
    """
402
    command_lst = [sys.executable]
1✔
403
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
404
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
405
    elif cores > 1:
1✔
UNCOV
406
        raise ImportError(
×
407
            "mpi4py is required for parallel calculations. Please install mpi4py."
408
        )
409
    else:
410
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
411
    return command_lst
1✔
412

413

414
def _wait_for_free_slots(
1✔
415
    active_task_dict: dict,
416
    cores_requested: int,
417
    max_cores: Optional[int] = None,
418
    max_workers: Optional[int] = None,
419
) -> dict:
420
    """
421
    Wait for available computing resources to become available.
422

423
    Args:
424
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
425
        cores_requested (int): Number of cores required for executing the next task
426
        max_cores (int): Maximum number cores which can be used
427
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
428
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
429
                           recommended, as computers have a limited number of compute cores.
430

431
    Returns:
432
        dict: Dictionary containing the future objects and the number of cores they require
433
    """
434
    if max_cores is not None:
1✔
435
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
436
            active_task_dict = {
1✔
437
                k: v for k, v in active_task_dict.items() if not k.done()
438
            }
439
    elif max_workers is not None and max_cores is None:
1✔
440
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
441
            active_task_dict = {
1✔
442
                k: v for k, v in active_task_dict.items() if not k.done()
443
            }
444
    return active_task_dict
1✔
445

446

447
def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list:
1✔
448
    """
449
    Submit the waiting tasks, which future inputs have been completed, to the executor
450

451
    Args:
452
        wait_lst (list): List of waiting tasks
453
        executor_queue (Queue): Queue of the internal executor
454

455
    Returns:
456
        list: list tasks which future inputs have not been completed
457
    """
458
    wait_tmp_lst = []
1✔
459
    for task_wait_dict in wait_lst:
1✔
460
        if all(future.done() for future in task_wait_dict["future_lst"]):
1✔
461
            del task_wait_dict["future_lst"]
1✔
462
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
463
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
464
            )
465
            executor_queue.put(task_wait_dict)
1✔
466
        else:
467
            wait_tmp_lst.append(task_wait_dict)
1✔
468
    return wait_tmp_lst
1✔
469

470

471
def _update_futures_in_input(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
1✔
472
    """
473
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
474

475
    Args:
476
        args (tuple): function arguments
477
        kwargs (dict): function keyword arguments
478

479
    Returns:
480
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
481
    """
482

483
    def get_result(arg: Union[list[Future], Future]) -> Any:
1✔
484
        if isinstance(arg, Future):
1✔
485
            return arg.result()
1✔
486
        elif isinstance(arg, list):
1✔
487
            return [get_result(arg=el) for el in arg]
1✔
488
        else:
489
            return arg
1✔
490

491
    args = tuple([get_result(arg=arg) for arg in args])
1✔
492
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
493
    return args, kwargs
1✔
494

495

496
def _get_future_objects_from_input(task_dict: dict):
1✔
497
    """
498
    Check the input parameters if they contain future objects and which of these future objects are executed
499

500
    Args:
501
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
502
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
503

504
    Returns:
505
        list, boolean: list of future objects and boolean flag if all future objects are already done
506
    """
507
    future_lst = []
1✔
508

509
    def find_future_in_list(lst):
1✔
510
        for el in lst:
1✔
511
            if isinstance(el, Future):
1✔
512
                future_lst.append(el)
1✔
513
            elif isinstance(el, list):
1✔
514
                find_future_in_list(lst=el)
1✔
515

516
    find_future_in_list(lst=task_dict["args"])
1✔
517
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
518
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
519
        future_lst
520
    )
521
    return future_lst, boolean_flag
1✔
522

523

524
def _submit_function_to_separate_process(
1✔
525
    task_dict: dict,
526
    active_task_dict: dict,
527
    qtask: queue.Queue,
528
    spawner: type[BaseSpawner],
529
    executor_kwargs: dict,
530
    max_cores: Optional[int] = None,
531
    max_workers: Optional[int] = None,
532
    hostname_localhost: Optional[bool] = None,
533
):
534
    """
535
    Submit function to be executed in separate Python process
536
    Args:
537
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
538
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
539
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
540
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
541
        spawner (BaseSpawner): Interface to start process on selected compute resources
542
        executor_kwargs (dict): keyword parameters used to initialize the Executor
543
        max_cores (int): defines the number cores which can be used in parallel
544
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
545
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
546
                           recommended, as computers have a limited number of compute cores.
547
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
548
                                     context of an HPC cluster this essential to be able to communicate to an
549
                                     Executor running on a different compute node within the same allocation. And
550
                                     in principle any computer should be able to resolve that their own hostname
551
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
552
                                     this look up for security reasons. So on MacOS it is required to set this
553
                                     option to true
554
    Returns:
555
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
556
                             dictionary containing the future objects and the number of cores they require
557
    """
558
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
559
    qtask.put(task_dict)
1✔
560
    qtask.put({"shutdown": True, "wait": True})
1✔
561
    if "cores" not in resource_dict or (
1✔
562
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
563
    ):
564
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
565
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
566
    active_task_dict = _wait_for_free_slots(
1✔
567
        active_task_dict=active_task_dict,
568
        cores_requested=slots_required,
569
        max_cores=max_cores,
570
        max_workers=max_workers,
571
    )
572
    active_task_dict[task_dict["future"]] = slots_required
1✔
573
    task_kwargs = executor_kwargs.copy()
1✔
574
    task_kwargs.update(resource_dict)
1✔
575
    task_kwargs.update(
1✔
576
        {
577
            "future_queue": qtask,
578
            "spawner": spawner,
579
            "hostname_localhost": hostname_localhost,
580
            "init_function": None,
581
        }
582
    )
583
    process = RaisingThread(
1✔
584
        target=execute_parallel_tasks,
585
        kwargs=task_kwargs,
586
    )
587
    process.start()
1✔
588
    return process, active_task_dict
1✔
589

590

591
def _execute_task(
1✔
592
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
593
):
594
    """
595
    Execute the task in the task_dict by communicating it via the interface.
596

597
    Args:
598
        interface (SocketInterface): socket interface for zmq communication
599
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
600
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
601
        future_queue (Queue): Queue for receiving new tasks.
602
    """
603
    f = task_dict.pop("future")
1✔
604
    if f.set_running_or_notify_cancel():
1✔
605
        try:
1✔
606
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
607
        except Exception as thread_exception:
1✔
608
            interface.shutdown(wait=True)
1✔
609
            future_queue.task_done()
1✔
610
            f.set_exception(exception=thread_exception)
1✔
611
            raise thread_exception
1✔
612
        else:
613
            future_queue.task_done()
1✔
614

615

616
def _execute_task_with_cache(
1✔
617
    interface: SocketInterface,
618
    task_dict: dict,
619
    future_queue: queue.Queue,
620
    cache_directory: str,
621
):
622
    """
623
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
624

625
    Args:
626
        interface (SocketInterface): socket interface for zmq communication
627
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
628
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
629
        future_queue (Queue): Queue for receiving new tasks.
630
        cache_directory (str): The directory to store cache files.
631
    """
632
    from executorlib.standalone.hdf import dump, get_output
1✔
633

634
    task_key, data_dict = serialize_funct_h5(
1✔
635
        fn=task_dict["fn"],
636
        fn_args=task_dict["args"],
637
        fn_kwargs=task_dict["kwargs"],
638
        resource_dict=task_dict.get("resource_dict", {}),
639
    )
640
    os.makedirs(cache_directory, exist_ok=True)
1✔
641
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
642
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
643
        f = task_dict.pop("future")
1✔
644
        if f.set_running_or_notify_cancel():
1✔
645
            try:
1✔
646
                time_start = time.time()
1✔
647
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
648
                data_dict["output"] = result
1✔
649
                data_dict["runtime"] = time.time() - time_start
1✔
650
                dump(file_name=file_name, data_dict=data_dict)
1✔
651
                f.set_result(result)
1✔
652
            except Exception as thread_exception:
1✔
653
                interface.shutdown(wait=True)
1✔
654
                future_queue.task_done()
1✔
655
                f.set_exception(exception=thread_exception)
1✔
656
                raise thread_exception
1✔
657
            else:
658
                future_queue.task_done()
1✔
659
    else:
660
        _, result = get_output(file_name=file_name)
1✔
661
        future = task_dict["future"]
1✔
662
        future.set_result(result)
1✔
663
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc