• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12480245925

24 Dec 2024 10:07AM UTC coverage: 96.374% (+0.007%) from 96.367%
12480245925

Pull #532

github

web-flow
Merge 7759fb24b into 5cf3ecc7e
Pull Request #532: More error tests

3 of 3 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

1010 of 1048 relevant lines covered (96.37%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.53
/executorlib/interactive/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
import time
1✔
6
from concurrent.futures import Future
1✔
7
from time import sleep
1✔
8
from typing import Callable, List, Optional
1✔
9

10
from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
1✔
11
from executorlib.standalone.command import get_command_path
1✔
12
from executorlib.standalone.inputcheck import (
1✔
13
    check_resource_dict,
14
    check_resource_dict_is_empty,
15
)
16
from executorlib.standalone.interactive.communication import (
1✔
17
    SocketInterface,
18
    interface_bootup,
19
)
20
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1✔
21
from executorlib.standalone.serialize import serialize_funct_h5
1✔
22
from executorlib.standalone.thread import RaisingThread
1✔
23

24

25
class ExecutorBroker(ExecutorBase):
1✔
26
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
27
        """
28
        Submits a callable to be executed with the given arguments.
29

30
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
31
        a Future instance representing the execution of the callable.
32

33
        Args:
34
            fn (callable): function to submit for execution
35
            args: arguments for the submitted function
36
            kwargs: keyword arguments for the submitted function
37
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
38
                                  function. Example resource dictionary: {
39
                                      cores: 1,
40
                                      threads_per_core: 1,
41
                                      gpus_per_worker: 0,
42
                                      oversubscribe: False,
43
                                      cwd: None,
44
                                      executor: None,
45
                                      hostname_localhost: False,
46
                                  }
47

48
        Returns:
49
            Future: A Future representing the given call.
50
        """
51
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
52
        check_resource_dict(function=fn)
1✔
53
        f = Future()
1✔
54
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
55
        return f
1✔
56

57
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
58
        """Clean-up the resources associated with the Executor.
59

60
        It is safe to call this method several times. Otherwise, no other
61
        methods can be called after this one.
62

63
        Args:
64
            wait: If True then shutdown will not return until all running
65
                futures have finished executing and the resources used by the
66
                parallel_executors have been reclaimed.
67
            cancel_futures: If True then shutdown will cancel all pending
68
                futures. Futures that are completed or running will not be
69
                cancelled.
70
        """
71
        if cancel_futures:
1✔
72
            cancel_items_in_queue(que=self._future_queue)
1✔
73
        if self._process is not None:
1✔
74
            for _ in range(len(self._process)):
1✔
75
                self._future_queue.put({"shutdown": True, "wait": wait})
1✔
76
            if wait:
1✔
77
                for process in self._process:
1✔
78
                    process.join()
1✔
79
                self._future_queue.join()
1✔
80
        self._process = None
1✔
81
        self._future_queue = None
1✔
82

83
    def _set_process(self, process: List[RaisingThread]):
1✔
84
        """
85
        Set the process for the executor.
86

87
        Args:
88
            process (List[RaisingThread]): The process for the executor.
89
        """
90
        self._process = process
1✔
91
        for process in self._process:
1✔
92
            process.start()
1✔
93

94

95
class InteractiveExecutor(ExecutorBroker):
1✔
96
    """
97
    The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
98
    tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
99
    executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
100
    the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
101
    improves the usability in particular when used in combination with Jupyter notebooks.
102

103
    Args:
104
        max_workers (int): defines the number workers which can execute functions in parallel
105
        executor_kwargs (dict): keyword arguments for the executor
106
        spawner (BaseSpawner): interface class to initiate python processes
107

108
    Examples:
109

110
        >>> import numpy as np
111
        >>> from executorlib.interactive.executor import InteractiveExecutor
112
        >>>
113
        >>> def calc(i, j, k):
114
        >>>     from mpi4py import MPI
115
        >>>     size = MPI.COMM_WORLD.Get_size()
116
        >>>     rank = MPI.COMM_WORLD.Get_rank()
117
        >>>     return np.array([i, j, k]), size, rank
118
        >>>
119
        >>> def init_k():
120
        >>>     return {"k": 3}
121
        >>>
122
        >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
123
        >>>     fs = p.submit(calc, 2, j=4)
124
        >>>     print(fs.result())
125
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
126

127
    """
128

129
    def __init__(
1✔
130
        self,
131
        max_workers: int = 1,
132
        executor_kwargs: dict = {},
133
        spawner: BaseSpawner = MpiExecSpawner,
134
    ):
135
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
136
        executor_kwargs["future_queue"] = self._future_queue
1✔
137
        executor_kwargs["spawner"] = spawner
1✔
138
        executor_kwargs["queue_join_on_shutdown"] = (
1✔
139
            False  # The same queue is shared over multiple threads
140
        )
141
        self._set_process(
1✔
142
            process=[
143
                RaisingThread(
144
                    target=execute_parallel_tasks,
145
                    kwargs=executor_kwargs,
146
                )
147
                for _ in range(max_workers)
148
            ],
149
        )
150

151

152
class InteractiveStepExecutor(ExecutorBase):
1✔
153
    """
154
    The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
155
    tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
156
    can be executed in a serial python process and does not require the python script to be executed with MPI.
157
    Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used
158
    in combination with Jupyter notebooks.
159

160
    Args:
161
        max_cores (int): defines the number workers which can execute functions in parallel
162
        executor_kwargs (dict): keyword arguments for the executor
163
        spawner (BaseSpawner): interface class to initiate python processes
164

165
    Examples:
166

167
        >>> import numpy as np
168
        >>> from executorlib.interactive.executor import InteractiveStepExecutor
169
        >>>
170
        >>> def calc(i, j, k):
171
        >>>     from mpi4py import MPI
172
        >>>     size = MPI.COMM_WORLD.Get_size()
173
        >>>     rank = MPI.COMM_WORLD.Get_rank()
174
        >>>     return np.array([i, j, k]), size, rank
175
        >>>
176
        >>> with PyFluxStepExecutor(max_cores=2) as p:
177
        >>>     fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
178
        >>>     print(fs.result())
179

180
        [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
181

182
    """
183

184
    def __init__(
1✔
185
        self,
186
        max_cores: Optional[int] = None,
187
        max_workers: Optional[int] = None,
188
        executor_kwargs: dict = {},
189
        spawner: BaseSpawner = MpiExecSpawner,
190
    ):
191
        super().__init__(max_cores=executor_kwargs.get("max_cores", None))
1✔
192
        executor_kwargs["future_queue"] = self._future_queue
1✔
193
        executor_kwargs["spawner"] = spawner
1✔
194
        executor_kwargs["max_cores"] = max_cores
1✔
195
        executor_kwargs["max_workers"] = max_workers
1✔
196
        self._set_process(
1✔
197
            RaisingThread(
198
                target=execute_separate_tasks,
199
                kwargs=executor_kwargs,
200
            )
201
        )
202

203

204
def execute_parallel_tasks(
1✔
205
    future_queue: queue.Queue,
206
    cores: int = 1,
207
    spawner: BaseSpawner = MpiExecSpawner,
208
    hostname_localhost: Optional[bool] = None,
209
    init_function: Optional[Callable] = None,
210
    cache_directory: Optional[str] = None,
211
    queue_join_on_shutdown: bool = True,
212
    **kwargs,
213
) -> None:
214
    """
215
    Execute a single tasks in parallel using the message passing interface (MPI).
216

217
    Args:
218
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
219
       cores (int): defines the total number of MPI ranks to use
220
       spawner (BaseSpawner): Spawner to start process on selected compute resources
221
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
222
                                     context of an HPC cluster this essential to be able to communicate to an
223
                                     Executor running on a different compute node within the same allocation. And
224
                                     in principle any computer should be able to resolve that their own hostname
225
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
226
                                     this look up for security reasons. So on MacOS it is required to set this
227
                                     option to true
228
       init_function (callable): optional function to preset arguments for functions which are submitted later
229
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
230
       queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
231
    """
232
    interface = interface_bootup(
1✔
233
        command_lst=_get_backend_path(
234
            cores=cores,
235
        ),
236
        connections=spawner(cores=cores, **kwargs),
237
        hostname_localhost=hostname_localhost,
238
    )
239
    if init_function is not None:
1✔
240
        interface.send_dict(
1✔
241
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
242
        )
243
    while True:
1✔
244
        task_dict = future_queue.get()
1✔
245
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
246
            interface.shutdown(wait=task_dict["wait"])
1✔
247
            future_queue.task_done()
1✔
248
            if queue_join_on_shutdown:
1✔
249
                future_queue.join()
1✔
250
            break
1✔
251
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
252
            if cache_directory is None:
1✔
253
                _execute_task(
1✔
254
                    interface=interface, task_dict=task_dict, future_queue=future_queue
255
                )
256
            else:
257
                _execute_task_with_cache(
1✔
258
                    interface=interface,
259
                    task_dict=task_dict,
260
                    future_queue=future_queue,
261
                    cache_directory=cache_directory,
262
                )
263

264

265
def execute_separate_tasks(
1✔
266
    future_queue: queue.Queue,
267
    spawner: BaseSpawner = MpiExecSpawner,
268
    max_cores: Optional[int] = None,
269
    max_workers: Optional[int] = None,
270
    hostname_localhost: Optional[bool] = None,
271
    **kwargs,
272
):
273
    """
274
    Execute a single tasks in parallel using the message passing interface (MPI).
275

276
    Args:
277
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
278
       spawner (BaseSpawner): Interface to start process on selected compute resources
279
       max_cores (int): defines the number cores which can be used in parallel
280
       max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
281
                          cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
282
                          recommended, as computers have a limited number of compute cores.
283
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
284
                                     context of an HPC cluster this essential to be able to communicate to an
285
                                     Executor running on a different compute node within the same allocation. And
286
                                     in principle any computer should be able to resolve that their own hostname
287
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
288
                                     this look up for security reasons. So on MacOS it is required to set this
289
                                     option to true
290
    """
291
    active_task_dict = {}
1✔
292
    process_lst, qtask_lst = [], []
1✔
293
    if "cores" not in kwargs.keys():
1✔
294
        kwargs["cores"] = 1
1✔
295
    while True:
1✔
296
        task_dict = future_queue.get()
1✔
297
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
298
            if task_dict["wait"]:
1✔
299
                _ = [process.join() for process in process_lst]
1✔
300
            future_queue.task_done()
1✔
301
            future_queue.join()
1✔
302
            break
1✔
303
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
304
            qtask = queue.Queue()
1✔
305
            process, active_task_dict = _submit_function_to_separate_process(
1✔
306
                task_dict=task_dict,
307
                qtask=qtask,
308
                active_task_dict=active_task_dict,
309
                spawner=spawner,
310
                executor_kwargs=kwargs,
311
                max_cores=max_cores,
312
                max_workers=max_workers,
313
                hostname_localhost=hostname_localhost,
314
            )
315
            qtask_lst.append(qtask)
1✔
316
            process_lst.append(process)
1✔
317
            future_queue.task_done()
1✔
318

319

320
def execute_tasks_with_dependencies(
1✔
321
    future_queue: queue.Queue,
322
    executor_queue: queue.Queue,
323
    executor: ExecutorBase,
324
    refresh_rate: float = 0.01,
325
):
326
    """
327
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
328
    other tasks.
329

330
    Args:
331
        future_queue (Queue): Queue for receiving new tasks.
332
        executor_queue (Queue): Queue for the internal executor.
333
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
334
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
335
    """
336
    wait_lst = []
1✔
337
    while True:
1✔
338
        try:
1✔
339
            task_dict = future_queue.get_nowait()
1✔
340
        except queue.Empty:
1✔
341
            task_dict = None
1✔
342
        if (  # shutdown the executor
1✔
343
            task_dict is not None
344
            and "shutdown" in task_dict.keys()
345
            and task_dict["shutdown"]
346
        ):
347
            executor.shutdown(wait=task_dict["wait"])
1✔
348
            future_queue.task_done()
1✔
349
            future_queue.join()
1✔
350
            break
1✔
351
        elif (  # handle function submitted to the executor
1✔
352
            task_dict is not None
353
            and "fn" in task_dict.keys()
354
            and "future" in task_dict.keys()
355
        ):
356
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
357
            if len(future_lst) == 0 or ready_flag:
1✔
358
                # No future objects are used in the input or all future objects are already done
359
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
360
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
361
                )
362
                executor_queue.put(task_dict)
1✔
363
            else:  # Otherwise add the function to the wait list
364
                task_dict["future_lst"] = future_lst
1✔
365
                wait_lst.append(task_dict)
1✔
366
            future_queue.task_done()
1✔
367
        elif len(wait_lst) > 0:
1✔
368
            number_waiting = len(wait_lst)
1✔
369
            # Check functions in the wait list and execute them if all future objects are now ready
370
            wait_lst = _submit_waiting_task(
1✔
371
                wait_lst=wait_lst, executor_queue=executor_queue
372
            )
373
            # if no job is ready, sleep for a moment
374
            if len(wait_lst) == number_waiting:
1✔
375
                sleep(refresh_rate)
1✔
376
        else:
377
            # If there is nothing else to do, sleep for a moment
378
            sleep(refresh_rate)
1✔
379

380

381
def _get_backend_path(
1✔
382
    cores: int,
383
) -> list:
384
    """
385
    Get command to call backend as a list of two strings
386

387
    Args:
388
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
389

390
    Returns:
391
        list[str]: List of strings containing the python executable path and the backend script to execute
392
    """
393
    command_lst = [sys.executable]
1✔
394
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
395
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
396
    elif cores > 1:
1✔
UNCOV
397
        raise ImportError(
×
398
            "mpi4py is required for parallel calculations. Please install mpi4py."
399
        )
400
    else:
401
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
402
    return command_lst
1✔
403

404

405
def _wait_for_free_slots(
1✔
406
    active_task_dict: dict,
407
    cores_requested: int,
408
    max_cores: Optional[int] = None,
409
    max_workers: Optional[int] = None,
410
) -> dict:
411
    """
412
    Wait for available computing resources to become available.
413

414
    Args:
415
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
416
        cores_requested (int): Number of cores required for executing the next task
417
        max_cores (int): Maximum number cores which can be used
418
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
419
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
420
                           recommended, as computers have a limited number of compute cores.
421

422
    Returns:
423
        dict: Dictionary containing the future objects and the number of cores they require
424
    """
425
    if max_cores is not None:
1✔
426
        while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
427
            active_task_dict = {
1✔
428
                k: v for k, v in active_task_dict.items() if not k.done()
429
            }
430
    elif max_workers is not None and max_cores is None:
1✔
431
        while len(active_task_dict.values()) + 1 > max_workers:
1✔
432
            active_task_dict = {
1✔
433
                k: v for k, v in active_task_dict.items() if not k.done()
434
            }
435
    return active_task_dict
1✔
436

437

438
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
439
    """
440
    Submit the waiting tasks, which future inputs have been completed, to the executor
441

442
    Args:
443
        wait_lst (list): List of waiting tasks
444
        executor_queue (Queue): Queue of the internal executor
445

446
    Returns:
447
        list: list tasks which future inputs have not been completed
448
    """
449
    wait_tmp_lst = []
1✔
450
    for task_wait_dict in wait_lst:
1✔
451
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
452
            del task_wait_dict["future_lst"]
1✔
453
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
454
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
455
            )
456
            executor_queue.put(task_wait_dict)
1✔
457
        else:
458
            wait_tmp_lst.append(task_wait_dict)
1✔
459
    return wait_tmp_lst
1✔
460

461

462
def _update_futures_in_input(args: tuple, kwargs: dict):
1✔
463
    """
464
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
465

466
    Args:
467
        args (tuple): function arguments
468
        kwargs (dict): function keyword arguments
469

470
    Returns:
471
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
472
    """
473

474
    def get_result(arg):
1✔
475
        if isinstance(arg, Future):
1✔
476
            return arg.result()
1✔
477
        elif isinstance(arg, list):
1✔
478
            return [get_result(arg=el) for el in arg]
1✔
479
        else:
480
            return arg
1✔
481

482
    args = [get_result(arg=arg) for arg in args]
1✔
483
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
484
    return args, kwargs
1✔
485

486

487
def _get_future_objects_from_input(task_dict: dict):
1✔
488
    """
489
    Check the input parameters if they contain future objects and which of these future objects are executed
490

491
    Args:
492
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
493
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
494

495
    Returns:
496
        list, boolean: list of future objects and boolean flag if all future objects are already done
497
    """
498
    future_lst = []
1✔
499

500
    def find_future_in_list(lst):
1✔
501
        for el in lst:
1✔
502
            if isinstance(el, Future):
1✔
503
                future_lst.append(el)
1✔
504
            elif isinstance(el, list):
1✔
505
                find_future_in_list(lst=el)
1✔
506

507
    find_future_in_list(lst=task_dict["args"])
1✔
508
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
509
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
510
        future_lst
511
    )
512
    return future_lst, boolean_flag
1✔
513

514

515
def _submit_function_to_separate_process(
1✔
516
    task_dict: dict,
517
    active_task_dict: dict,
518
    qtask: queue.Queue,
519
    spawner: BaseSpawner,
520
    executor_kwargs: dict,
521
    max_cores: Optional[int] = None,
522
    max_workers: Optional[int] = None,
523
    hostname_localhost: Optional[bool] = None,
524
):
525
    """
526
    Submit function to be executed in separate Python process
527
    Args:
528
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
529
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
530
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
531
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
532
        spawner (BaseSpawner): Interface to start process on selected compute resources
533
        executor_kwargs (dict): keyword parameters used to initialize the Executor
534
        max_cores (int): defines the number cores which can be used in parallel
535
        max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
536
                           cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
537
                           recommended, as computers have a limited number of compute cores.
538
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
539
                                     context of an HPC cluster this essential to be able to communicate to an
540
                                     Executor running on a different compute node within the same allocation. And
541
                                     in principle any computer should be able to resolve that their own hostname
542
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
543
                                     this look up for security reasons. So on MacOS it is required to set this
544
                                     option to true
545
    Returns:
546
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
547
                             dictionary containing the future objects and the number of cores they require
548
    """
549
    resource_dict = task_dict.pop("resource_dict").copy()
1✔
550
    qtask.put(task_dict)
1✔
551
    qtask.put({"shutdown": True, "wait": True})
1✔
552
    if "cores" not in resource_dict.keys() or (
1✔
553
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
554
    ):
555
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
556
    slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
1✔
557
    active_task_dict = _wait_for_free_slots(
1✔
558
        active_task_dict=active_task_dict,
559
        cores_requested=slots_required,
560
        max_cores=max_cores,
561
        max_workers=max_workers,
562
    )
563
    active_task_dict[task_dict["future"]] = slots_required
1✔
564
    task_kwargs = executor_kwargs.copy()
1✔
565
    task_kwargs.update(resource_dict)
1✔
566
    task_kwargs.update(
1✔
567
        {
568
            "future_queue": qtask,
569
            "spawner": spawner,
570
            "hostname_localhost": hostname_localhost,
571
            "init_function": None,
572
        }
573
    )
574
    process = RaisingThread(
1✔
575
        target=execute_parallel_tasks,
576
        kwargs=task_kwargs,
577
    )
578
    process.start()
1✔
579
    return process, active_task_dict
1✔
580

581

582
def _execute_task(
1✔
583
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
584
):
585
    """
586
    Execute the task in the task_dict by communicating it via the interface.
587

588
    Args:
589
        interface (SocketInterface): socket interface for zmq communication
590
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
591
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
592
        future_queue (Queue): Queue for receiving new tasks.
593
    """
594
    f = task_dict.pop("future")
1✔
595
    if f.set_running_or_notify_cancel():
1✔
596
        try:
1✔
597
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
598
        except Exception as thread_exception:
1✔
599
            interface.shutdown(wait=True)
1✔
600
            future_queue.task_done()
1✔
601
            f.set_exception(exception=thread_exception)
1✔
602
            raise thread_exception
1✔
603
        else:
604
            future_queue.task_done()
1✔
605

606

607
def _execute_task_with_cache(
1✔
608
    interface: SocketInterface,
609
    task_dict: dict,
610
    future_queue: queue.Queue,
611
    cache_directory: str,
612
):
613
    """
614
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
615

616
    Args:
617
        interface (SocketInterface): socket interface for zmq communication
618
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
619
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
620
        future_queue (Queue): Queue for receiving new tasks.
621
        cache_directory (str): The directory to store cache files.
622
    """
623
    from executorlib.standalone.hdf import dump, get_output
1✔
624

625
    task_key, data_dict = serialize_funct_h5(
1✔
626
        fn=task_dict["fn"],
627
        fn_args=task_dict["args"],
628
        fn_kwargs=task_dict["kwargs"],
629
        resource_dict=task_dict.get("resource_dict", {}),
630
    )
631
    os.makedirs(cache_directory, exist_ok=True)
1✔
632
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
633
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
634
        f = task_dict.pop("future")
1✔
635
        if f.set_running_or_notify_cancel():
1✔
636
            try:
1✔
637
                time_start = time.time()
1✔
638
                result = interface.send_and_receive_dict(input_dict=task_dict)
1✔
639
                data_dict["output"] = result
1✔
640
                data_dict["runtime"] = time.time() - time_start
1✔
641
                dump(file_name=file_name, data_dict=data_dict)
1✔
642
                f.set_result(result)
1✔
643
            except Exception as thread_exception:
1✔
644
                interface.shutdown(wait=True)
1✔
645
                future_queue.task_done()
1✔
646
                f.set_exception(exception=thread_exception)
1✔
647
                raise thread_exception
1✔
648
            else:
649
                future_queue.task_done()
1✔
650
    else:
651
        _, result = get_output(file_name=file_name)
1✔
652
        future = task_dict["future"]
1✔
653
        future.set_result(result)
1✔
654
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc