• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12483232754

24 Dec 2024 02:59PM UTC coverage: 96.004% (-0.4%) from 96.367%
12483232754

Pull #535

github

web-flow
Merge 1607bb008 into 5cf3ecc7e
Pull Request #535: Add type checking with mypy

117 of 124 new or added lines in 15 files covered. (94.35%)

7 existing lines in 4 files now uncovered.

1033 of 1076 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.53
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Any, Callable, List, Union, Optional, Tuple
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future:  # type: ignore
1✔
27
        """
28
        Submits a callable to be executed with the given arguments.
29

30
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
31
        a Future instance representing the execution of the callable.
32

33
        Args:
34
            fn (Callable): function to submit for execution
35
            args: arguments for the submitted function
36
            kwargs: keyword arguments for the submitted function
37
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
38
                                  function. Example resource dictionary: {
39
                                      cores: 1,
40
                                      threads_per_core: 1,
41
                                      gpus_per_worker: 0,
42
                                      oversubscribe: False,
43
                                      cwd: None,
44
                                      executor: None,
45
                                      hostname_localhost: False,
46
                                  }
47

48
        Returns:
49
            Future: A Future representing the given call.
50
        """
51
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
52
        check_resource_dict(function=fn)
1✔
53
        f: Future = Future()
1✔
54
        if self._future_queue is not None:
1✔
55
            self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
56
        return f
1✔
57

58
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
59
        """Clean-up the resources associated with the Executor.
60

61
        It is safe to call this method several times. Otherwise, no other
62
        methods can be called after this one.
63

64
        Args:
65
            wait: If True then shutdown will not return until all running
66
                futures have finished executing and the resources used by the
67
                parallel_executors have been reclaimed.
68
            cancel_futures: If True then shutdown will cancel all pending
69
                futures. Futures that are completed or running will not be
70
                cancelled.
71
        """
72
        if self._future_queue is not None:
1✔
73
            if cancel_futures:
1✔
74
                cancel_items_in_queue(que=self._future_queue)
1✔
75
            if isinstance(self._process, list):
1✔
76
                for _ in range(len(self._process)):
1✔
77
                    self._future_queue.put({"shutdown": True, "wait": wait})
1✔
78
                if wait:
1✔
79
                    for process in self._process:
1✔
80
                        process.join()
1✔
81
                    self._future_queue.join()
1✔
82
        self._process = None
1✔
83
        self._future_queue = None
1✔
84

85
    def _set_process(self, process: List[RaisingThread]):  # type: ignore
1✔
86
        """
87
        Set the process for the executor.
88

89
        Args:
90
            process (List[RaisingThread]): The process for the executor.
91
        """
92
        self._process = process
1✔
93
        for process_instance in self._process:
1✔
94
            process_instance.start()
1✔
95

96

97
class InteractiveExecutor(ExecutorBroker):
1✔
98
    """
99
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
100
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
101
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
102
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
103
    improves the usability in particular when used in combination with Jupyter notebooks.
104

105
    Args:
106
        max_workers (int): defines the number workers which can execute functions in parallel
107
        executor_kwargs (dict): keyword arguments for the executor
108
        spawner (BaseSpawner): interface class to initiate python processes
109

110
    Examples:
111

112
        >>> import numpy as np
113
        >>> from executorlib.interactive.executor import InteractiveExecutor
114
        >>>
115
        >>> def calc(i, j, k):
116
        >>>     from mpi4py import MPI
117
        >>>     size = MPI.COMM_WORLD.Get_size()
118
        >>>     rank = MPI.COMM_WORLD.Get_rank()
119
        >>>     return np.array([i, j, k]), size, rank
120
        >>>
121
        >>> def init_k():
122
        >>>     return {"k": 3}
123
        >>>
124
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
125
        >>>     fs = p.submit(calc, 2, j=4)
126
        >>>     print(fs.result())
127
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
128

129
    """
130

131
    def __init__(
1✔
132
        self,
133
        max_workers: int = 1,
134
        executor_kwargs: dict = {},
135
        spawner: type[BaseSpawner] = MpiExecSpawner,
136
    ):
137
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
138
        executor_kwargs["future_queue"] = self._future_queue
1✔
139
        executor_kwargs["spawner"] = spawner
1✔
140
        self._set_process(
1✔
141
            process=[
142
                RaisingThread(
143
                    target=execute_parallel_tasks,
144
                    kwargs=executor_kwargs,
145
                )
146
                for _ in range(max_workers)
147
            ],
148
        )
149

150

151
class InteractiveStepExecutor(ExecutorBase):
1✔
152
    """
153
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
154
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
155
    can be executed in a serial python process and does not require the python script to be executed with MPI.
156
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
157
    in combination with Jupyter notebooks.
158

159
    Args:
160
        max_cores (int): defines the number workers which can execute functions in parallel
161
        executor_kwargs (dict): keyword arguments for the executor
162
        spawner (BaseSpawner): interface class to initiate python processes
163

164
    Examples:
165

166
        >>> import numpy as np
167
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
168
        >>>
169
        >>> def calc(i, j, k):
170
        >>>     from mpi4py import MPI
171
        >>>     size = MPI.COMM_WORLD.Get_size()
172
        >>>     rank = MPI.COMM_WORLD.Get_rank()
173
        >>>     return np.array([i, j, k]), size, rank
174
        >>>
175
        >>> with PyFluxStepExecutor(max_cores=2) as p:
176
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
177
        >>>     print(fs.result())
178

179
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
180

181
    """
182

183
    def __init__(
1✔
184
        self,
185
        max_cores: Optional[int] = None,
186
        max_workers: Optional[int] = None,
187
        executor_kwargs: dict = {},
188
        spawner: type[BaseSpawner] = MpiExecSpawner,
189
    ):
190
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
191
        executor_kwargs["future_queue"] = self._future_queue
1✔
192
        executor_kwargs["spawner"] = spawner
1✔
193
        executor_kwargs["max_cores"] = max_cores
1✔
194
        executor_kwargs["max_workers"] = max_workers
1✔
195
        self._set_process(
1✔
196
            RaisingThread(
197
                target=execute_separate_tasks,
198
                kwargs=executor_kwargs,
199
            )
200
        )
201

202

203
def execute_parallel_tasks(
1✔
204
    future_queue: queue.Queue,
205
    cores: int = 1,
206
    spawner: type[BaseSpawner] = MpiExecSpawner,
207
    hostname_localhost: Optional[bool] = None,
208
    init_function: Optional[Callable] = None,
209
    cache_directory: Optional[str] = None,
210
    **kwargs,
211
) -> None:
212
    """
213
    Execute a single tasks in parallel using the message passing interface (MPI).
214

215
    Args:
216
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
217
       cores (int): defines the total number of MPI ranks to use
218
       spawner (BaseSpawner): Spawner to start process on selected compute resources
219
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
220
                                     context of an HPC cluster this essential to be able to communicate to an
221
                                     Executor running on a different compute node within the same allocation. And
222
                                     in principle any computer should be able to resolve that their own hostname
223
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
224
                                     this look up for security reasons. So on MacOS it is required to set this
225
                                     option to true
226
       init_function (Callable): optional function to preset arguments for functions which are submitted later
227
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
228
    """
229
    interface = interface_bootup(
1✔
230
        command_lst=_get_backend_path(
231
            cores=cores,
232
        ),
233
        connections=spawner(cores=cores, **kwargs),
234
        hostname_localhost=hostname_localhost,
235
    )
236
    if init_function is not None:
1✔
237
        interface.send_dict(
1✔
238
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
239
        )
240
    while True:
1✔
241
        task_dict = future_queue.get()
1✔
242
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
243
            interface.shutdown(wait=task_dict["wait"])
1✔
244
            future_queue.task_done()
1✔
245
            future_queue.join()
1✔
246
            break
1✔
247
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
248
            if cache_directory is None:
1✔
249
                _execute_task(
1✔
250
                    interface=interface, task_dict=task_dict, future_queue=future_queue
251
                )
252
            else:
253
                _execute_task_with_cache(
1✔
254
                    interface=interface,
255
                    task_dict=task_dict,
256
                    future_queue=future_queue,
257
                    cache_directory=cache_directory,
258
                )
259

260

261
def execute_separate_tasks(
1✔
262
    future_queue: queue.Queue,
263
    spawner: type[BaseSpawner] = MpiExecSpawner,
264
    max_cores: Optional[int] = None,
265
    max_workers: Optional[int] = None,
266
    hostname_localhost: Optional[bool] = None,
267
    **kwargs,
268
):
269
    """
270
    Execute a single tasks in parallel using the message passing interface (MPI).
271

272
    Args:
273
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
274
       spawner (BaseSpawner): Interface to start process on selected compute resources
275
       max_cores (int): defines the number cores which can be used in parallel
276
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
277
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
278
                          recommended, as computers have a limited number of compute cores.
279
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
280
                                     context of an HPC cluster this essential to be able to communicate to an
281
                                     Executor running on a different compute node within the same allocation. And
282
                                     in principle any computer should be able to resolve that their own hostname
283
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
284
                                     this look up for security reasons. So on MacOS it is required to set this
285
                                     option to true
286
    """
287
    active_task_dict: dict = {}
1✔
288
    process_lst: list = []
1✔
289
    qtask_lst: list = []
1✔
290
    if "cores" not in kwargs.keys():
1✔
291
        kwargs["cores"] = 1
1✔
292
    while True:
1✔
293
        task_dict = future_queue.get()
1✔
294
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
295
            if task_dict["wait"]:
1✔
296
                _ = [process.join() for process in process_lst]
1✔
297
            future_queue.task_done()
1✔
298
            future_queue.join()
1✔
299
            break
1✔
300
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
301
            qtask: queue.Queue = queue.Queue()
1✔
302
            process, active_task_dict = _submit_function_to_separate_process(
1✔
303
                task_dict=task_dict,
304
                qtask=qtask,
305
                active_task_dict=active_task_dict,
306
                spawner=spawner,
307
                executor_kwargs=kwargs,
308
                max_cores=max_cores,
309
                max_workers=max_workers,
310
                hostname_localhost=hostname_localhost,
311
            )
312
            qtask_lst.append(qtask)
1✔
313
            process_lst.append(process)
1✔
314
            future_queue.task_done()
1✔
315

316

317
def execute_tasks_with_dependencies(
1✔
318
    future_queue: queue.Queue,
319
    executor_queue: queue.Queue,
320
    executor: ExecutorBase,
321
    refresh_rate: float = 0.01,
322
):
323
    """
324
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
325
    other tasks.
326

327
    Args:
328
        future_queue (Queue): Queue for receiving new tasks.
329
        executor_queue (Queue): Queue for the internal executor.
330
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
331
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
332
    """
333
    wait_lst = []
1✔
334
    while True:
1✔
335
        try:
1✔
336
            task_dict = future_queue.get_nowait()
1✔
337
        except queue.Empty:
1✔
338
            task_dict = None
1✔
339
        if (  # shutdown the executor
1✔
340
            task_dict is not None
341
            and "shutdown" in task_dict.keys()
342
            and task_dict["shutdown"]
343
        ):
344
            executor.shutdown(wait=task_dict["wait"])
1✔
345
            future_queue.task_done()
1✔
346
            future_queue.join()
1✔
347
            break
1✔
348
        elif (  # handle function submitted to the executor
1✔
349
            task_dict is not None
350
            and "fn" in task_dict.keys()
351
            and "future" in task_dict.keys()
352
        ):
353
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
354
            if len(future_lst) == 0 or ready_flag:
1✔
355
                # No future objects are used in the input or all future objects are already done
356
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
357
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
358
                )
359
                executor_queue.put(task_dict)
1✔
360
            else:  # Otherwise add the function to the wait list
361
                task_dict["future_lst"] = future_lst
1✔
362
                wait_lst.append(task_dict)
1✔
363
            future_queue.task_done()
1✔
364
        elif len(wait_lst) > 0:
1✔
365
            number_waiting = len(wait_lst)
1✔
366
            # Check functions in the wait list and execute them if all future objects are now ready
367
            wait_lst = _submit_waiting_task(
1✔
368
                wait_lst=wait_lst, executor_queue=executor_queue
369
            )
370
            # if no job is ready, sleep for a moment
371
            if len(wait_lst) == number_waiting:
1✔
372
                sleep(refresh_rate)
1✔
373
        else:
374
            # If there is nothing else to do, sleep for a moment
375
            sleep(refresh_rate)
1✔
376

377

378
def _get_backend_path(
1✔
379
    cores: int,
380
) -> list:
381
    """
382
    Get command to call backend as a list of two strings
383

384
    Args:
385
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
386

387
    Returns:
388
        list[str]: List of strings containing the python executable path and the backend script to execute
389
    """
390
    command_lst = [sys.executable]
1✔
391
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
392
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
393
    elif cores > 1:
1✔
UNCOV
394
        raise ImportError(
×
395
            "mpi4py is required for parallel calculations. Please install mpi4py."
396
        )
397
    else:
398
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
399
    return command_lst
1✔
400

401

402
def _wait_for_free_slots(
1✔
403
    active_task_dict: dict,
404
    cores_requested: int,
405
    max_cores: Optional[int] = None,
406
    max_workers: Optional[int] = None,
407
) -> dict:
408
    """
409
    Wait for available computing resources to become available.
410

411
    Args:
412
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
413
        cores_requested (int): Number of cores required for executing the next task
414
        max_cores (int): Maximum number cores which can be used
415
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
416
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
417
                           recommended, as computers have a limited number of compute cores.
418

419
    Returns:
420
        dict: Dictionary containing the future objects and the number of cores they require
421
    """
422
    if max_cores is not None:
1✔
423
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
424
            active_task_dict = {
1✔
425
                k: v for k, v in active_task_dict.items() if not k.done()
426
            }
427
    elif max_workers is not None and max_cores is None:
1✔
428
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
429
            active_task_dict = {
1✔
430
                k: v for k, v in active_task_dict.items() if not k.done()
431
            }
432
    return active_task_dict
1✔
433

434

435
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
436
    """
437
    Submit the waiting tasks, which future inputs have been completed, to the executor
438

439
    Args:
440
        wait_lst (list): List of waiting tasks
441
        executor_queue (Queue): Queue of the internal executor
442

443
    Returns:
444
        list: list tasks which future inputs have not been completed
445
    """
446
    wait_tmp_lst = []
1✔
447
    for task_wait_dict in wait_lst:
1✔
448
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
449
            del task_wait_dict["future_lst"]
1✔
450
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
451
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
452
            )
453
            executor_queue.put(task_wait_dict)
1✔
454
        else:
455
            wait_tmp_lst.append(task_wait_dict)
1✔
456
    return wait_tmp_lst
1✔
457

458

459
def _update_futures_in_input(args: tuple, kwargs: dict) -> Tuple[tuple, dict]:
1✔
460
    """
461
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
462

463
    Args:
464
        args (tuple): function arguments
465
        kwargs (dict): function keyword arguments
466

467
    Returns:
468
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
469
    """
470

471
    def get_result(arg: Union[List[Future], Future]) -> Any:
1✔
472
        if isinstance(arg, Future):
1✔
473
            return arg.result()
1✔
474
        elif isinstance(arg, list):
1✔
475
            return [get_result(arg=el) for el in arg]
1✔
476
        else:
477
            return arg
1✔
478

479
    args = tuple([get_result(arg=arg) for arg in args])
1✔
480
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
481
    return args, kwargs
1✔
482

483

484
def _get_future_objects_from_input(task_dict: dict):
1✔
485
    """
486
    Check the input parameters if they contain future objects and which of these future objects are executed
487

488
    Args:
489
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
490
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
491

492
    Returns:
493
        list, boolean: list of future objects and boolean flag if all future objects are already done
494
    """
495
    future_lst = []
1✔
496

497
    def find_future_in_list(lst):
1✔
498
        for el in lst:
1✔
499
            if isinstance(el, Future):
1✔
500
                future_lst.append(el)
1✔
501
            elif isinstance(el, list):
1✔
502
                find_future_in_list(lst=el)
1✔
503

504
    find_future_in_list(lst=task_dict["args"])
1✔
505
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
506
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
507
        future_lst
508
    )
509
    return future_lst, boolean_flag
1✔
510

511

512
def _submit_function_to_separate_process(
1✔
513
    task_dict: dict,
514
    active_task_dict: dict,
515
    qtask: queue.Queue,
516
    spawner: type[BaseSpawner],
517
    executor_kwargs: dict,
518
    max_cores: Optional[int] = None,
519
    max_workers: Optional[int] = None,
520
    hostname_localhost: Optional[bool] = None,
521
):
522
    """
523
    Submit function to be executed in separate Python process
524
    Args:
525
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
526
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
527
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
528
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
529
        spawner (BaseSpawner): Interface to start process on selected compute resources
530
        executor_kwargs (dict): keyword parameters used to initialize the Executor
531
        max_cores (int): defines the number cores which can be used in parallel
532
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
533
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
534
                           recommended, as computers have a limited number of compute cores.
535
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
536
                                     context of an HPC cluster this essential to be able to communicate to an
537
                                     Executor running on a different compute node within the same allocation. And
538
                                     in principle any computer should be able to resolve that their own hostname
539
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
540
                                     this look up for security reasons. So on MacOS it is required to set this
541
                                     option to true
542
    Returns:
543
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
544
                             dictionary containing the future objects and the number of cores they require
545
    """
546
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
547
    qtask.put(task_dict)
1✔
548
    qtask.put({"shutdown": True, "wait": True})
1✔
549
    if "cores" not in resource_dict.keys() or (
1✔
550
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
551
    ):
552
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
553
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
554
    active_task_dict = _wait_for_free_slots(
1✔
555
        active_task_dict=active_task_dict,
556
        cores_requested=slots_required,
557
        max_cores=max_cores,
558
        max_workers=max_workers,
559
    )
560
    active_task_dict[task_dict["future"]] = slots_required
1✔
561
    task_kwargs = executor_kwargs.copy()
1✔
562
    task_kwargs.update(resource_dict)
1✔
563
    task_kwargs.update(
1✔
564
        {
565
            "future_queue": qtask,
566
            "spawner": spawner,
567
            "hostname_localhost": hostname_localhost,
568
            "init_function": None,
569
        }
570
    )
571
    process = RaisingThread(
1✔
572
        target=execute_parallel_tasks,
573
        kwargs=task_kwargs,
574
    )
575
    process.start()
1✔
576
    return process, active_task_dict
1✔
577

578

579
def _execute_task(
1✔
580
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
581
):
582
    """
583
    Execute the task in the task_dict by communicating it via the interface.
584

585
    Args:
586
        interface (SocketInterface): socket interface for zmq communication
587
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
588
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
589
        future_queue (Queue): Queue for receiving new tasks.
590
    """
591
    f = task_dict.pop("future")
1✔
592
    if f.set_running_or_notify_cancel():
1✔
593
        try:
1✔
594
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
595
        except Exception as thread_exception:
1✔
596
            interface.shutdown(wait=True)
1✔
597
            future_queue.task_done()
1✔
598
            f.set_exception(exception=thread_exception)
1✔
599
            raise thread_exception
1✔
600
        else:
601
            future_queue.task_done()
1✔
602

603

604
def _execute_task_with_cache(
1✔
605
    interface: SocketInterface,
606
    task_dict: dict,
607
    future_queue: queue.Queue,
608
    cache_directory: str,
609
):
610
    """
611
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
612

613
    Args:
614
        interface (SocketInterface): socket interface for zmq communication
615
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
616
                          {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
617
        future_queue (Queue): Queue for receiving new tasks.
618
        cache_directory (str): The directory to store cache files.
619
    """
620
    from executorlib.standalone.hdf import dump, get_output
1✔
621

622
    task_key, data_dict = serialize_funct_h5(
1✔
623
        fn=task_dict["fn"],
624
        fn_args=task_dict["args"],
625
        fn_kwargs=task_dict["kwargs"],
626
        resource_dict=task_dict.get("resource_dict", {}),
627
    )
628
    os.makedirs(cache_directory, exist_ok=True)
1✔
629
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
630
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
631
        f = task_dict.pop("future")
1✔
632
        if f.set_running_or_notify_cancel():
1✔
633
            try:
1✔
634
                time_start = time.time()
1✔
635
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
636
                data_dict["output"] = result
1✔
637
                data_dict["runtime"] = time.time() - time_start
1✔
638
                dump(file_name=file_name, data_dict=data_dict)
1✔
639
                f.set_result(result)
1✔
640
            except Exception as thread_exception:
1✔
641
                interface.shutdown(wait=True)
1✔
642
                future_queue.task_done()
1✔
643
                f.set_exception(exception=thread_exception)
1✔
644
                raise thread_exception
1✔
645
            else:
646
                future_queue.task_done()
1✔
647
    else:
648
        _, result = get_output(file_name=file_name)
1✔
649
        future = task_dict["future"]
1✔
650
        future.set_result(result)
1✔
651
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc