• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13119627889

03 Feb 2025 05:51PM UTC coverage: 95.996% (-0.5%) from 96.536%
13119627889

Pull #555

github

web-flow
Merge 05f160496 into 5ec2a2015
Pull Request #555: Add linting

75 of 85 new or added lines in 16 files covered. (88.24%)

1 existing line in 1 file now uncovered.

1079 of 1124 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.64
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Any, Callable, Optional, Union
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(
1✔
27
        self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
28
    ) -> Future:  # type: ignore
29
        """
30
        Submits a callable to be executed with the given arguments.
31

32
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
33
        a Future instance representing the execution of the callable.
34

35
        Args:
36
            fn (Callable): function to submit for execution
37
            args: arguments for the submitted function
38
            kwargs: keyword arguments for the submitted function
39
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
40
                                  function. Example resource dictionary: {
41
                                      cores: 1,
42
                                      threads_per_core: 1,
43
                                      gpus_per_worker: 0,
44
                                      oversubscribe: False,
45
                                      cwd: None,
46
                                      executor: None,
47
                                      hostname_localhost: False,
48
                                  }
49

50
        Returns:
51
            Future: A Future representing the given call.
52
        """
53
        if resource_dict is None:
1✔
54
            resource_dict = {}
1✔
55
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
56
        check_resource_dict(function=fn)
1✔
57
        f: Future = Future()
1✔
58
        if self._future_queue is not None:
1✔
59
            self._future_queue.put(
1✔
60
                {"fn": fn, "args": args, "kwargs": kwargs, "future": f}
61
            )
62
        return f
1✔
63

64
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
65
        """Clean-up the resources associated with the Executor.
66

67
        It is safe to call this method several times. Otherwise, no other
68
        methods can be called after this one.
69

70
        Args:
71
            wait: If True then shutdown will not return until all running
72
                futures have finished executing and the resources used by the
73
                parallel_executors have been reclaimed.
74
            cancel_futures: If True then shutdown will cancel all pending
75
                futures. Futures that are completed or running will not be
76
                cancelled.
77
        """
78
        if self._future_queue is not None:
1✔
79
            if cancel_futures:
1✔
80
                cancel_items_in_queue(que=self._future_queue)
1✔
81
            if isinstance(self._process, list):
1✔
82
                for _ in range(len(self._process)):
1✔
83
                    self._future_queue.put({"shutdown": True, "wait": wait})
1✔
84
                if wait:
1✔
85
                    for process in self._process:
1✔
86
                        process.join()
1✔
87
                    self._future_queue.join()
1✔
88
        self._process = None
1✔
89
        self._future_queue = None
1✔
90

91
    def _set_process(self, process: list[RaisingThread]):  # type: ignore
1✔
92
        """
93
        Set the process for the executor.
94

95
        Args:
96
            process (List[RaisingThread]): The process for the executor.
97
        """
98
        self._process = process
1✔
99
        for process_instance in self._process:
1✔
100
            process_instance.start()
1✔
101

102

103
class InteractiveExecutor(ExecutorBroker):
1✔
104
    """
105
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
106
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
107
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
108
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
109
    improves the usability in particular when used in combination with Jupyter notebooks.
110

111
    Args:
112
        max_workers (int): defines the number workers which can execute functions in parallel
113
        executor_kwargs (dict): keyword arguments for the executor
114
        spawner (BaseSpawner): interface class to initiate python processes
115

116
    Examples:
117

118
        >>> import numpy as np
119
        >>> from executorlib.interactive.executor import InteractiveExecutor
120
        >>>
121
        >>> def calc(i, j, k):
122
        >>>     from mpi4py import MPI
123
        >>>     size = MPI.COMM_WORLD.Get_size()
124
        >>>     rank = MPI.COMM_WORLD.Get_rank()
125
        >>>     return np.array([i, j, k]), size, rank
126
        >>>
127
        >>> def init_k():
128
        >>>     return {"k": 3}
129
        >>>
130
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
131
        >>>     fs = p.submit(calc, 2, j=4)
132
        >>>     print(fs.result())
133
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
134

135
    """
136

137
    def __init__(
1✔
138
        self,
139
        max_workers: int = 1,
140
        executor_kwargs: Optional[dict] = None,
141
        spawner: type[BaseSpawner] = MpiExecSpawner,
142
    ):
143
        if executor_kwargs is None:
1✔
NEW
144
            executor_kwargs = {}
×
145
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
146
        executor_kwargs["future_queue"] = self._future_queue
1✔
147
        executor_kwargs["spawner"] = spawner
1✔
148
        executor_kwargs["queue_join_on_shutdown"] = False
1✔
149
        self._set_process(
1✔
150
            process=[
151
                RaisingThread(
152
                    target=execute_parallel_tasks,
153
                    kwargs=executor_kwargs,
154
                )
155
                for _ in range(max_workers)
156
            ],
157
        )
158

159

160
class InteractiveStepExecutor(ExecutorBase):
1✔
161
    """
162
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
163
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
164
    can be executed in a serial python process and does not require the python script to be executed with MPI.
165
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
166
    in combination with Jupyter notebooks.
167

168
    Args:
169
        max_cores (int): defines the number workers which can execute functions in parallel
170
        executor_kwargs (dict): keyword arguments for the executor
171
        spawner (BaseSpawner): interface class to initiate python processes
172

173
    Examples:
174

175
        >>> import numpy as np
176
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
177
        >>>
178
        >>> def calc(i, j, k):
179
        >>>     from mpi4py import MPI
180
        >>>     size = MPI.COMM_WORLD.Get_size()
181
        >>>     rank = MPI.COMM_WORLD.Get_rank()
182
        >>>     return np.array([i, j, k]), size, rank
183
        >>>
184
        >>> with PyFluxStepExecutor(max_cores=2) as p:
185
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
186
        >>>     print(fs.result())
187

188
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
189

190
    """
191

192
    def __init__(
1✔
193
        self,
194
        max_cores: Optional[int] = None,
195
        max_workers: Optional[int] = None,
196
        executor_kwargs: Optional[dict] = None,
197
        spawner: type[BaseSpawner] = MpiExecSpawner,
198
    ):
199
        if executor_kwargs is None:
1✔
NEW
200
            executor_kwargs = {}
×
201
        super().__init__(max_cores=executor_kwargs.get("max_cores"))
1✔
202
        executor_kwargs["future_queue"] = self._future_queue
1✔
203
        executor_kwargs["spawner"] = spawner
1✔
204
        executor_kwargs["max_cores"] = max_cores
1✔
205
        executor_kwargs["max_workers"] = max_workers
1✔
206
        self._set_process(
1✔
207
            RaisingThread(
208
                target=execute_separate_tasks,
209
                kwargs=executor_kwargs,
210
            )
211
        )
212

213

214
def execute_parallel_tasks(
1✔
215
    future_queue: queue.Queue,
216
    cores: int = 1,
217
    spawner: type[BaseSpawner] = MpiExecSpawner,
218
    hostname_localhost: Optional[bool] = None,
219
    init_function: Optional[Callable] = None,
220
    cache_directory: Optional[str] = None,
221
    queue_join_on_shutdown: bool = True,
222
    **kwargs,
223
) -> None:
224
    """
225
    Execute a single tasks in parallel using the message passing interface (MPI).
226

227
    Args:
228
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
229
       cores (int): defines the total number of MPI ranks to use
230
       spawner (BaseSpawner): Spawner to start process on selected compute resources
231
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
232
                                     context of an HPC cluster this essential to be able to communicate to an
233
                                     Executor running on a different compute node within the same allocation. And
234
                                     in principle any computer should be able to resolve that their own hostname
235
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
236
                                     this look up for security reasons. So on MacOS it is required to set this
237
                                     option to true
238
       init_function (Callable): optional function to preset arguments for functions which are submitted later
239
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
240
       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
241
    """
242
    interface = interface_bootup(
1✔
243
        command_lst=_get_backend_path(
244
            cores=cores,
245
        ),
246
        connections=spawner(cores=cores, **kwargs),
247
        hostname_localhost=hostname_localhost,
248
    )
249
    if init_function is not None:
1✔
250
        interface.send_dict(
1✔
251
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
252
        )
253
    while True:
1✔
254
        task_dict = future_queue.get()
1✔
255
        if "shutdown" in task_dict and task_dict["shutdown"]:
1✔
256
            interface.shutdown(wait=task_dict["wait"])
1✔
257
            future_queue.task_done()
1✔
258
            if queue_join_on_shutdown:
1✔
259
                future_queue.join()
1✔
260
            break
1✔
261
        elif "fn" in task_dict and "future" in task_dict:
1✔
262
            if cache_directory is None:
1✔
263
                _execute_task(
1✔
264
                    interface=interface, task_dict=task_dict, future_queue=future_queue
265
                )
266
            else:
267
                _execute_task_with_cache(
1✔
268
                    interface=interface,
269
                    task_dict=task_dict,
270
                    future_queue=future_queue,
271
                    cache_directory=cache_directory,
272
                )
273

274

275
def execute_separate_tasks(
1✔
276
    future_queue: queue.Queue,
277
    spawner: type[BaseSpawner] = MpiExecSpawner,
278
    max_cores: Optional[int] = None,
279
    max_workers: Optional[int] = None,
280
    hostname_localhost: Optional[bool] = None,
281
    **kwargs,
282
):
283
    """
284
    Execute a single tasks in parallel using the message passing interface (MPI).
285

286
    Args:
287
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
288
       spawner (BaseSpawner): Interface to start process on selected compute resources
289
       max_cores (int): defines the number cores which can be used in parallel
290
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
291
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
292
                          recommended, as computers have a limited number of compute cores.
293
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
294
                                     context of an HPC cluster this essential to be able to communicate to an
295
                                     Executor running on a different compute node within the same allocation. And
296
                                     in principle any computer should be able to resolve that their own hostname
297
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
298
                                     this look up for security reasons. So on MacOS it is required to set this
299
                                     option to true
300
    """
301
    active_task_dict: dict = {}
1✔
302
    process_lst: list = []
1✔
303
    qtask_lst: list = []
1✔
304
    if "cores" not in kwargs:
1✔
305
        kwargs["cores"] = 1
1✔
306
    while True:
1✔
307
        task_dict = future_queue.get()
1✔
308
        if "shutdown" in task_dict and task_dict["shutdown"]:
1✔
309
            if task_dict["wait"]:
1✔
310
                _ = [process.join() for process in process_lst]
1✔
311
            future_queue.task_done()
1✔
312
            future_queue.join()
1✔
313
            break
1✔
314
        elif "fn" in task_dict and "future" in task_dict:
1✔
315
            qtask: queue.Queue = queue.Queue()
1✔
316
            process, active_task_dict = _submit_function_to_separate_process(
1✔
317
                task_dict=task_dict,
318
                qtask=qtask,
319
                active_task_dict=active_task_dict,
320
                spawner=spawner,
321
                executor_kwargs=kwargs,
322
                max_cores=max_cores,
323
                max_workers=max_workers,
324
                hostname_localhost=hostname_localhost,
325
            )
326
            qtask_lst.append(qtask)
1✔
327
            process_lst.append(process)
1✔
328
            future_queue.task_done()
1✔
329

330

331
def execute_tasks_with_dependencies(
1✔
332
    future_queue: queue.Queue,
333
    executor_queue: queue.Queue,
334
    executor: ExecutorBase,
335
    refresh_rate: float = 0.01,
336
):
337
    """
338
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
339
    other tasks.
340

341
    Args:
342
        future_queue (Queue): Queue for receiving new tasks.
343
        executor_queue (Queue): Queue for the internal executor.
344
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
345
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
346
    """
347
    wait_lst = []
1✔
348
    while True:
1✔
349
        try:
1✔
350
            task_dict = future_queue.get_nowait()
1✔
351
        except queue.Empty:
1✔
352
            task_dict = None
1✔
353
        if (  # shutdown the executor
1✔
354
            task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]
355
        ):
356
            executor.shutdown(wait=task_dict["wait"])
1✔
357
            future_queue.task_done()
1✔
358
            future_queue.join()
1✔
359
            break
1✔
360
        elif (  # handle function submitted to the executor
1✔
361
            task_dict is not None and "fn" in task_dict and "future" in task_dict
362
        ):
363
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
364
            if len(future_lst) == 0 or ready_flag:
1✔
365
                # No future objects are used in the input or all future objects are already done
366
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
367
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
368
                )
369
                executor_queue.put(task_dict)
1✔
370
            else:  # Otherwise add the function to the wait list
371
                task_dict["future_lst"] = future_lst
1✔
372
                wait_lst.append(task_dict)
1✔
373
            future_queue.task_done()
1✔
374
        elif len(wait_lst) > 0:
1✔
375
            number_waiting = len(wait_lst)
1✔
376
            # Check functions in the wait list and execute them if all future objects are now ready
377
            wait_lst = _submit_waiting_task(
1✔
378
                wait_lst=wait_lst, executor_queue=executor_queue
379
            )
380
            # if no job is ready, sleep for a moment
381
            if len(wait_lst) == number_waiting:
1✔
382
                sleep(refresh_rate)
1✔
383
        else:
384
            # If there is nothing else to do, sleep for a moment
385
            sleep(refresh_rate)
1✔
386

387

388
def _get_backend_path(
1✔
389
    cores: int,
390
) -> list:
391
    """
392
    Get command to call backend as a list of two strings
393

394
    Args:
395
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
396

397
    Returns:
398
        list[str]: List of strings containing the python executable path and the backend script to execute
399
    """
400
    command_lst = [sys.executable]
1✔
401
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
402
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
403
    elif cores > 1:
1✔
404
        raise ImportError(
×
405
            "mpi4py is required for parallel calculations. Please install mpi4py."
406
        )
407
    else:
408
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
409
    return command_lst
1✔
410

411

412
def _wait_for_free_slots(
1✔
413
    active_task_dict: dict,
414
    cores_requested: int,
415
    max_cores: Optional[int] = None,
416
    max_workers: Optional[int] = None,
417
) -> dict:
418
    """
419
    Wait for available computing resources to become available.
420

421
    Args:
422
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
423
        cores_requested (int): Number of cores required for executing the next task
424
        max_cores (int): Maximum number cores which can be used
425
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
426
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
427
                           recommended, as computers have a limited number of compute cores.
428

429
    Returns:
430
        dict: Dictionary containing the future objects and the number of cores they require
431
    """
432
    if max_cores is not None:
1✔
433
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
434
            active_task_dict = {
1✔
435
                k: v for k, v in active_task_dict.items() if not k.done()
436
            }
437
    elif max_workers is not None and max_cores is None:
1✔
438
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
439
            active_task_dict = {
1✔
440
                k: v for k, v in active_task_dict.items() if not k.done()
441
            }
442
    return active_task_dict
1✔
443

444

445
def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list:
1✔
446
    """
447
    Submit the waiting tasks, which future inputs have been completed, to the executor
448

449
    Args:
450
        wait_lst (list): List of waiting tasks
451
        executor_queue (Queue): Queue of the internal executor
452

453
    Returns:
454
        list: list tasks which future inputs have not been completed
455
    """
456
    wait_tmp_lst = []
1✔
457
    for task_wait_dict in wait_lst:
1✔
458
        if all(future.done() for future in task_wait_dict["future_lst"]):
1✔
459
            del task_wait_dict["future_lst"]
1✔
460
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
461
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
462
            )
463
            executor_queue.put(task_wait_dict)
1✔
464
        else:
465
            wait_tmp_lst.append(task_wait_dict)
1✔
466
    return wait_tmp_lst
1✔
467

468

469
def _update_futures_in_input(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
1✔
470
    """
471
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
472

473
    Args:
474
        args (tuple): function arguments
475
        kwargs (dict): function keyword arguments
476

477
    Returns:
478
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
479
    """
480

481
    def get_result(arg: Union[list[Future], Future]) -> Any:
1✔
482
        if isinstance(arg, Future):
1✔
483
            return arg.result()
1✔
484
        elif isinstance(arg, list):
1✔
485
            return [get_result(arg=el) for el in arg]
1✔
486
        else:
487
            return arg
1✔
488

489
    args = tuple([get_result(arg=arg) for arg in args])
1✔
490
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
491
    return args, kwargs
1✔
492

493

494
def _get_future_objects_from_input(task_dict: dict):
1✔
495
    """
496
    Check the input parameters if they contain future objects and which of these future objects are executed
497

498
    Args:
499
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
500
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
501

502
    Returns:
503
        list, boolean: list of future objects and boolean flag if all future objects are already done
504
    """
505
    future_lst = []
1✔
506

507
    def find_future_in_list(lst):
1✔
508
        for el in lst:
1✔
509
            if isinstance(el, Future):
1✔
510
                future_lst.append(el)
1✔
511
            elif isinstance(el, list):
1✔
512
                find_future_in_list(lst=el)
1✔
513

514
    find_future_in_list(lst=task_dict["args"])
1✔
515
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
516
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
517
        future_lst
518
    )
519
    return future_lst, boolean_flag
1✔
520

521

522
def _submit_function_to_separate_process(
1✔
523
    task_dict: dict,
524
    active_task_dict: dict,
525
    qtask: queue.Queue,
526
    spawner: type[BaseSpawner],
527
    executor_kwargs: dict,
528
    max_cores: Optional[int] = None,
529
    max_workers: Optional[int] = None,
530
    hostname_localhost: Optional[bool] = None,
531
):
532
    """
533
    Submit function to be executed in separate Python process
534
    Args:
535
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
536
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
537
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
538
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
539
        spawner (BaseSpawner): Interface to start process on selected compute resources
540
        executor_kwargs (dict): keyword parameters used to initialize the Executor
541
        max_cores (int): defines the number cores which can be used in parallel
542
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
543
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
544
                           recommended, as computers have a limited number of compute cores.
545
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
546
                                     context of an HPC cluster this essential to be able to communicate to an
547
                                     Executor running on a different compute node within the same allocation. And
548
                                     in principle any computer should be able to resolve that their own hostname
549
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
550
                                     this look up for security reasons. So on MacOS it is required to set this
551
                                     option to true
552
    Returns:
553
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
554
                             dictionary containing the future objects and the number of cores they require
555
    """
556
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
557
    qtask.put(task_dict)
1✔
558
    qtask.put({"shutdown": True, "wait": True})
1✔
559
    if "cores" not in resource_dict or (
1✔
560
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
561
    ):
562
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
563
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
564
    active_task_dict = _wait_for_free_slots(
1✔
565
        active_task_dict=active_task_dict,
566
        cores_requested=slots_required,
567
        max_cores=max_cores,
568
        max_workers=max_workers,
569
    )
570
    active_task_dict[task_dict["future"]] = slots_required
1✔
571
    task_kwargs = executor_kwargs.copy()
1✔
572
    task_kwargs.update(resource_dict)
1✔
573
    task_kwargs.update(
1✔
574
        {
575
            "future_queue": qtask,
576
            "spawner": spawner,
577
            "hostname_localhost": hostname_localhost,
578
            "init_function": None,
579
        }
580
    )
581
    process = RaisingThread(
1✔
582
        target=execute_parallel_tasks,
583
        kwargs=task_kwargs,
584
    )
585
    process.start()
1✔
586
    return process, active_task_dict
1✔
587

588

589
def _execute_task(
1✔
590
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
591
):
592
    """
593
    Execute the task in the task_dict by communicating it via the interface.
594

595
    Args:
596
        interface (SocketInterface): socket interface for zmq communication
597
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
598
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
599
        future_queue (Queue): Queue for receiving new tasks.
600
    """
601
    f = task_dict.pop("future")
1✔
602
    if f.set_running_or_notify_cancel():
1✔
603
        try:
1✔
604
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
605
        except Exception as thread_exception:
1✔
606
            interface.shutdown(wait=True)
1✔
607
            future_queue.task_done()
1✔
608
            f.set_exception(exception=thread_exception)
1✔
609
            raise thread_exception
1✔
610
        else:
611
            future_queue.task_done()
1✔
612

613

614
def _execute_task_with_cache(
1✔
615
    interface: SocketInterface,
616
    task_dict: dict,
617
    future_queue: queue.Queue,
618
    cache_directory: str,
619
):
620
    """
621
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
622

623
    Args:
624
        interface (SocketInterface): socket interface for zmq communication
625
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
626
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
627
        future_queue (Queue): Queue for receiving new tasks.
628
        cache_directory (str): The directory to store cache files.
629
    """
630
    from executorlib.standalone.hdf import dump, get_output
1✔
631

632
    task_key, data_dict = serialize_funct_h5(
1✔
633
        fn=task_dict["fn"],
634
        fn_args=task_dict["args"],
635
        fn_kwargs=task_dict["kwargs"],
636
        resource_dict=task_dict.get("resource_dict", {}),
637
    )
638
    os.makedirs(cache_directory, exist_ok=True)
1✔
639
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
640
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
641
        f = task_dict.pop("future")
1✔
642
        if f.set_running_or_notify_cancel():
1✔
643
            try:
1✔
644
                time_start = time.time()
1✔
645
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
646
                data_dict["output"] = result
1✔
647
                data_dict["runtime"] = time.time() - time_start
1✔
648
                dump(file_name=file_name, data_dict=data_dict)
1✔
649
                f.set_result(result)
1✔
650
            except Exception as thread_exception:
1✔
651
                interface.shutdown(wait=True)
1✔
652
                future_queue.task_done()
1✔
653
                f.set_exception(exception=thread_exception)
1✔
654
                raise thread_exception
1✔
655
            else:
656
                future_queue.task_done()
1✔
657
    else:
658
        _, result = get_output(file_name=file_name)
1✔
659
        future = task_dict["future"]
1✔
660
        future.set_result(result)
1✔
661
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc