• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11541431333

27 Oct 2024 02:48PM UTC coverage: 94.012% (-0.6%) from 94.582%
11541431333

push

github

web-flow
Split shared cache in backend and frontend (#443)

* Split shared cache in backend and frontend

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

2 of 2 new or added lines in 1 file covered. (100.0%)

4 existing lines in 1 file now uncovered.

785 of 835 relevant lines covered (94.01%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/executorlib/shared/executor.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import queue
1✔
5
import sys
1✔
6
from concurrent.futures import (
1✔
7
    Executor as FutureExecutor,
8
)
9
from concurrent.futures import (
1✔
10
    Future,
11
)
12
from time import sleep
1✔
13
from typing import Callable, List, Optional
1✔
14

15
import cloudpickle
1✔
16

17
from executorlib.shared.command import get_command_path
1✔
18
from executorlib.shared.communication import SocketInterface, interface_bootup
1✔
19
from executorlib.shared.inputcheck import (
1✔
20
    check_resource_dict,
21
    check_resource_dict_is_empty,
22
)
23
from executorlib.shared.serialize import serialize_funct_h5
1✔
24
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
1✔
25
from executorlib.shared.thread import RaisingThread
1✔
26

27

28
class ExecutorBase(FutureExecutor):
1✔
29
    """
30
    Base class for the executor.
31

32
    Args:
33
        FutureExecutor: Base class for the executor.
34
    """
35

36
    def __init__(self):
1✔
37
        """
38
        Initialize the ExecutorBase class.
39
        """
40
        cloudpickle_register(ind=3)
1✔
41
        self._future_queue: queue.Queue = queue.Queue()
1✔
42
        self._process: Optional[RaisingThread] = None
1✔
43

44
    @property
1✔
45
    def info(self) -> Optional[dict]:
1✔
46
        """
47
        Get the information about the executor.
48

49
        Returns:
50
            Optional[dict]: Information about the executor.
51
        """
52
        if self._process is not None and isinstance(self._process, list):
1✔
53
            meta_data_dict = self._process[0]._kwargs.copy()
1✔
54
            if "future_queue" in meta_data_dict.keys():
1✔
55
                del meta_data_dict["future_queue"]
1✔
56
            meta_data_dict["max_workers"] = len(self._process)
1✔
57
            return meta_data_dict
1✔
58
        elif self._process is not None:
1✔
59
            meta_data_dict = self._process._kwargs.copy()
1✔
60
            if "future_queue" in meta_data_dict.keys():
1✔
61
                del meta_data_dict["future_queue"]
1✔
62
            return meta_data_dict
1✔
63
        else:
64
            return None
1✔
65

66
    @property
1✔
67
    def future_queue(self) -> queue.Queue:
1✔
68
        """
69
        Get the future queue.
70

71
        Returns:
72
            queue.Queue: The future queue.
73
        """
74
        return self._future_queue
×
75

76
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
77
        """
78
        Submits a callable to be executed with the given arguments.
79

80
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
81
        a Future instance representing the execution of the callable.
82

83
        Args:
84
            fn (callable): function to submit for execution
85
            args: arguments for the submitted function
86
            kwargs: keyword arguments for the submitted function
87
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
88
                                  function. Example resource dictionary: {
89
                                      cores: 1,
90
                                      threads_per_core: 1,
91
                                      gpus_per_worker: 0,
92
                                      oversubscribe: False,
93
                                      cwd: None,
94
                                      executor: None,
95
                                      hostname_localhost: False,
96
                                  }
97

98
        Returns:
99
            Future: A Future representing the given call.
100
        """
101
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
102
        check_resource_dict(function=fn)
1✔
103
        f = Future()
1✔
104
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
105
        return f
1✔
106

107
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
108
        """
109
        Clean-up the resources associated with the Executor.
110

111
        It is safe to call this method several times. Otherwise, no other
112
        methods can be called after this one.
113

114
        Args:
115
            wait (bool): If True then shutdown will not return until all running
116
                futures have finished executing and the resources used by the
117
                parallel_executors have been reclaimed.
118
            cancel_futures (bool): If True then shutdown will cancel all pending
119
                futures. Futures that are completed or running will not be
120
                cancelled.
121
        """
122
        if cancel_futures:
1✔
123
            cancel_items_in_queue(que=self._future_queue)
×
124
        self._future_queue.put({"shutdown": True, "wait": wait})
1✔
125
        if wait and self._process is not None:
1✔
126
            self._process.join()
1✔
127
            self._future_queue.join()
1✔
128
        self._process = None
1✔
129
        self._future_queue = None
1✔
130

131
    def _set_process(self, process: RaisingThread):
1✔
132
        """
133
        Set the process for the executor.
134

135
        Args:
136
            process (RaisingThread): The process for the executor.
137
        """
138
        self._process = process
1✔
139
        self._process.start()
1✔
140

141
    def __len__(self) -> int:
1✔
142
        """
143
        Get the length of the executor.
144

145
        Returns:
146
            int: The length of the executor.
147
        """
148
        return self._future_queue.qsize()
1✔
149

150
    def __del__(self):
1✔
151
        """
152
        Clean-up the resources associated with the Executor.
153
        """
154
        try:
1✔
155
            self.shutdown(wait=False)
1✔
156
        except (AttributeError, RuntimeError):
1✔
157
            pass
1✔
158

159

160
class ExecutorBroker(ExecutorBase):
1✔
161
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
162
        """Clean-up the resources associated with the Executor.
163

164
        It is safe to call this method several times. Otherwise, no other
165
        methods can be called after this one.
166

167
        Args:
168
            wait: If True then shutdown will not return until all running
169
                futures have finished executing and the resources used by the
170
                parallel_executors have been reclaimed.
171
            cancel_futures: If True then shutdown will cancel all pending
172
                futures. Futures that are completed or running will not be
173
                cancelled.
174
        """
175
        if cancel_futures:
1✔
176
            cancel_items_in_queue(que=self._future_queue)
1✔
177
        if self._process is not None:
1✔
178
            for _ in range(len(self._process)):
1✔
179
                self._future_queue.put({"shutdown": True, "wait": wait})
1✔
180
            if wait:
1✔
181
                for process in self._process:
1✔
182
                    process.join()
1✔
183
                self._future_queue.join()
1✔
184
        self._process = None
1✔
185
        self._future_queue = None
1✔
186

187
    def _set_process(self, process: List[RaisingThread]):
1✔
188
        """
189
        Set the process for the executor.
190

191
        Args:
192
            process (List[RaisingThread]): The process for the executor.
193
        """
194
        self._process = process
1✔
195
        for process in self._process:
1✔
196
            process.start()
1✔
197

198

199
class ExecutorSteps(ExecutorBase):
1✔
200
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
1✔
201
        """
202
        Submits a callable to be executed with the given arguments.
203

204
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
205
        a Future instance representing the execution of the callable.
206

207
        Args:
208
            fn (callable): function to submit for execution
209
            args: arguments for the submitted function
210
            kwargs: keyword arguments for the submitted function
211
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
212
                                  function. Example resource dictionary: {
213
                                      cores: 1,
214
                                      threads_per_core: 1,
215
                                      gpus_per_worker: 0,
216
                                      oversubscribe: False,
217
                                      cwd: None,
218
                                      executor: None,
219
                                      hostname_localhost: False,
220
                                  }
221

222
        Returns:
223
            A Future representing the given call.
224
        """
225
        check_resource_dict(function=fn)
1✔
226
        f = Future()
1✔
227
        self._future_queue.put(
1✔
228
            {
229
                "fn": fn,
230
                "args": args,
231
                "kwargs": kwargs,
232
                "future": f,
233
                "resource_dict": resource_dict,
234
            }
235
        )
236
        return f
1✔
237

238
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
239
        """Clean-up the resources associated with the Executor.
240

241
        It is safe to call this method several times. Otherwise, no other
242
        methods can be called after this one.
243

244
        Args:
245
            wait: If True then shutdown will not return until all running
246
                futures have finished executing and the resources used by the
247
                parallel_executors have been reclaimed.
248
            cancel_futures: If True then shutdown will cancel all pending
249
                futures. Futures that are completed or running will not be
250
                cancelled.
251
        """
252
        if cancel_futures:
1✔
253
            cancel_items_in_queue(que=self._future_queue)
×
254
        if self._process is not None:
1✔
255
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
256
            if wait:
1✔
257
                self._process.join()
1✔
258
                self._future_queue.join()
1✔
259
        self._process = None
1✔
260
        self._future_queue = None
1✔
261

262

263
def cancel_items_in_queue(que: queue.Queue):
1✔
264
    """
265
    Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future
266
    objects have to be cancelled when the executor shuts down.
267

268
    Args:
269
        que (queue.Queue): Queue with task objects which should be executed
270
    """
271
    while True:
1✔
272
        try:
1✔
273
            item = que.get_nowait()
1✔
274
            if isinstance(item, dict) and "future" in item.keys():
1✔
275
                item["future"].cancel()
1✔
276
                que.task_done()
1✔
277
        except queue.Empty:
1✔
278
            break
1✔
279

280

281
def cloudpickle_register(ind: int = 2):
1✔
282
    """
283
    Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to
284
    be pickled by value rather than by reference, so the module which calls the map function is pickled by value.
285
    https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs
286
    inspect can help to find the module which is calling executorlib
287
    https://docs.python.org/3/library/inspect.html
288
    to learn more about inspect another good read is:
289
    http://pymotw.com/2/inspect/index.html#module-inspect
290
    1 refers to 1 level higher than the map function
291

292
    Args:
293
        ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used
294
    """
295
    try:  # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it.
1✔
296
        cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0]))
1✔
297
    except IndexError:
×
298
        cloudpickle_register(ind=ind - 1)
×
299
    except ValueError:
×
300
        pass
×
301

302

303
def execute_parallel_tasks(
1✔
304
    future_queue: queue.Queue,
305
    cores: int = 1,
306
    spawner: BaseSpawner = MpiExecSpawner,
307
    hostname_localhost: Optional[bool] = None,
308
    init_function: Optional[Callable] = None,
309
    cache_directory: Optional[str] = None,
310
    **kwargs,
311
) -> None:
312
    """
313
    Execute a single tasks in parallel using the message passing interface (MPI).
314

315
    Args:
316
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
317
       cores (int): defines the total number of MPI ranks to use
318
       spawner (BaseSpawner): Spawner to start process on selected compute resources
319
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
320
                                     context of an HPC cluster this essential to be able to communicate to an
321
                                     Executor running on a different compute node within the same allocation. And
322
                                     in principle any computer should be able to resolve that their own hostname
323
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
324
                                     this look up for security reasons. So on MacOS it is required to set this
325
                                     option to true
326
       init_function (callable): optional function to preset arguments for functions which are submitted later
327
       cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
328
    """
329
    interface = interface_bootup(
1✔
330
        command_lst=_get_backend_path(
331
            cores=cores,
332
        ),
333
        connections=spawner(cores=cores, **kwargs),
334
        hostname_localhost=hostname_localhost,
335
    )
336
    if init_function is not None:
1✔
337
        interface.send_dict(
1✔
338
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
339
        )
340
    while True:
1✔
341
        task_dict = future_queue.get()
1✔
342
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
343
            interface.shutdown(wait=task_dict["wait"])
1✔
344
            future_queue.task_done()
1✔
345
            future_queue.join()
1✔
346
            break
1✔
347
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
348
            if cache_directory is None:
1✔
349
                _execute_task(
1✔
350
                    interface=interface, task_dict=task_dict, future_queue=future_queue
351
                )
352
            else:
353
                _execute_task_with_cache(
1✔
354
                    interface=interface,
355
                    task_dict=task_dict,
356
                    future_queue=future_queue,
357
                    cache_directory=cache_directory,
358
                )
359

360

361
def execute_separate_tasks(
1✔
362
    future_queue: queue.Queue,
363
    spawner: BaseSpawner = MpiExecSpawner,
364
    max_cores: int = 1,
365
    hostname_localhost: Optional[bool] = None,
366
    **kwargs,
367
):
368
    """
369
    Execute a single tasks in parallel using the message passing interface (MPI).
370

371
    Args:
372
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
373
       spawner (BaseSpawner): Interface to start process on selected compute resources
374
       max_cores (int): defines the number cores which can be used in parallel
375
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
376
                                     context of an HPC cluster this essential to be able to communicate to an
377
                                     Executor running on a different compute node within the same allocation. And
378
                                     in principle any computer should be able to resolve that their own hostname
379
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
380
                                     this look up for security reasons. So on MacOS it is required to set this
381
                                     option to true
382
    """
383
    active_task_dict = {}
1✔
384
    process_lst, qtask_lst = [], []
1✔
385
    if "cores" not in kwargs.keys():
1✔
386
        kwargs["cores"] = 1
1✔
387
    while True:
1✔
388
        task_dict = future_queue.get()
1✔
389
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
390
            if task_dict["wait"]:
1✔
391
                _ = [process.join() for process in process_lst]
1✔
392
            future_queue.task_done()
1✔
393
            future_queue.join()
1✔
394
            break
1✔
395
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
396
            qtask = queue.Queue()
1✔
397
            process, active_task_dict = _submit_function_to_separate_process(
1✔
398
                task_dict=task_dict,
399
                qtask=qtask,
400
                active_task_dict=active_task_dict,
401
                spawner=spawner,
402
                executor_kwargs=kwargs,
403
                max_cores=max_cores,
404
                hostname_localhost=hostname_localhost,
405
            )
406
            qtask_lst.append(qtask)
1✔
407
            process_lst.append(process)
1✔
408
            future_queue.task_done()
1✔
409

410

411
def execute_tasks_with_dependencies(
1✔
412
    future_queue: queue.Queue,
413
    executor_queue: queue.Queue,
414
    executor: ExecutorBase,
415
    refresh_rate: float = 0.01,
416
):
417
    """
418
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
419
    other tasks.
420

421
    Args:
422
        future_queue (Queue): Queue for receiving new tasks.
423
        executor_queue (Queue): Queue for the internal executor.
424
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
425
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
426
    """
427
    wait_lst = []
1✔
428
    while True:
1✔
429
        try:
1✔
430
            task_dict = future_queue.get_nowait()
1✔
431
        except queue.Empty:
1✔
432
            task_dict = None
1✔
433
        if (  # shutdown the executor
1✔
434
            task_dict is not None
435
            and "shutdown" in task_dict.keys()
436
            and task_dict["shutdown"]
437
        ):
438
            executor.shutdown(wait=task_dict["wait"])
1✔
439
            future_queue.task_done()
1✔
440
            future_queue.join()
1✔
441
            break
1✔
442
        elif (  # handle function submitted to the executor
1✔
443
            task_dict is not None
444
            and "fn" in task_dict.keys()
445
            and "future" in task_dict.keys()
446
        ):
447
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
448
            if len(future_lst) == 0 or ready_flag:
1✔
449
                # No future objects are used in the input or all future objects are already done
450
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
451
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
452
                )
453
                executor_queue.put(task_dict)
1✔
454
            else:  # Otherwise add the function to the wait list
455
                task_dict["future_lst"] = future_lst
1✔
456
                wait_lst.append(task_dict)
1✔
457
            future_queue.task_done()
1✔
458
        elif len(wait_lst) > 0:
1✔
459
            number_waiting = len(wait_lst)
1✔
460
            # Check functions in the wait list and execute them if all future objects are now ready
461
            wait_lst = _submit_waiting_task(
1✔
462
                wait_lst=wait_lst, executor_queue=executor_queue
463
            )
464
            # if no job is ready, sleep for a moment
465
            if len(wait_lst) == number_waiting:
1✔
466
                sleep(refresh_rate)
1✔
467
        else:
468
            # If there is nothing else to do, sleep for a moment
469
            sleep(refresh_rate)
1✔
470

471

472
def _get_backend_path(
1✔
473
    cores: int,
474
) -> list:
475
    """
476
    Get command to call backend as a list of two strings
477

478
    Args:
479
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
480

481
    Returns:
482
        list[str]: List of strings containing the python executable path and the backend script to execute
483
    """
484
    command_lst = [sys.executable]
1✔
485
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
486
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
487
    elif cores > 1:
1✔
488
        raise ImportError(
×
489
            "mpi4py is required for parallel calculations. Please install mpi4py."
490
        )
491
    else:
492
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
493
    return command_lst
1✔
494

495

496
def _wait_for_free_slots(
1✔
497
    active_task_dict: dict, cores_requested: int, max_cores: int
498
) -> dict:
499
    """
500
    Wait for available computing resources to become available.
501

502
    Args:
503
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
504
        cores_requested (int): Number of cores required for executing the next task
505
        max_cores (int): Maximum number cores which can be used
506

507
    Returns:
508
        dict: Dictionary containing the future objects and the number of cores they require
509
    """
510
    while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
511
        active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
1✔
512
    return active_task_dict
1✔
513

514

515
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
516
    """
517
    Submit the waiting tasks, which future inputs have been completed, to the executor
518

519
    Args:
520
        wait_lst (list): List of waiting tasks
521
        executor_queue (Queue): Queue of the internal executor
522

523
    Returns:
524
        list: list tasks which future inputs have not been completed
525
    """
526
    wait_tmp_lst = []
1✔
527
    for task_wait_dict in wait_lst:
1✔
528
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
529
            del task_wait_dict["future_lst"]
1✔
530
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
531
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
532
            )
533
            executor_queue.put(task_wait_dict)
1✔
534
        else:
535
            wait_tmp_lst.append(task_wait_dict)
1✔
536
    return wait_tmp_lst
1✔
537

538

539
def _update_futures_in_input(args: tuple, kwargs: dict):
1✔
540
    """
541
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
542

543
    Args:
544
        args (tuple): function arguments
545
        kwargs (dict): function keyword arguments
546

547
    Returns:
548
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
549
    """
550

551
    def get_result(arg):
1✔
552
        if isinstance(arg, Future):
1✔
553
            return arg.result()
1✔
554
        elif isinstance(arg, list):
1✔
555
            return [get_result(arg=el) for el in arg]
1✔
556
        else:
557
            return arg
1✔
558

559
    args = [get_result(arg=arg) for arg in args]
1✔
560
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
561
    return args, kwargs
1✔
562

563

564
def _get_future_objects_from_input(task_dict: dict):
1✔
565
    """
566
    Check the input parameters if they contain future objects and which of these future objects are executed
567

568
    Args:
569
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
570
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
571

572
    Returns:
573
        list, boolean: list of future objects and boolean flag if all future objects are already done
574
    """
575
    future_lst = []
1✔
576

577
    def find_future_in_list(lst):
1✔
578
        for el in lst:
1✔
579
            if isinstance(el, Future):
1✔
580
                future_lst.append(el)
1✔
581
            elif isinstance(el, list):
1✔
582
                find_future_in_list(lst=el)
1✔
583

584
    find_future_in_list(lst=task_dict["args"])
1✔
585
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
586
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
587
        future_lst
588
    )
589
    return future_lst, boolean_flag
1✔
590

591

592
def _submit_function_to_separate_process(
1✔
593
    task_dict: dict,
594
    active_task_dict: dict,
595
    qtask: queue.Queue,
596
    spawner: BaseSpawner,
597
    executor_kwargs: dict,
598
    max_cores: int = 1,
599
    hostname_localhost: Optional[bool] = None,
600
):
601
    """
602
    Submit function to be executed in separate Python process
603
    Args:
604
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
605
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
606
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
607
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
608
        spawner (BaseSpawner): Interface to start process on selected compute resources
609
        executor_kwargs (dict): keyword parameters used to initialize the Executor
610
        max_cores (int): defines the number cores which can be used in parallel
611
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
612
                                     context of an HPC cluster this essential to be able to communicate to an
613
                                     Executor running on a different compute node within the same allocation. And
614
                                     in principle any computer should be able to resolve that their own hostname
615
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
616
                                     this look up for security reasons. So on MacOS it is required to set this
617
                                     option to true
618
    Returns:
619
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
620
                             dictionary containing the future objects and the number of cores they require
621
    """
622
    resource_dict = task_dict.pop("resource_dict")
1✔
623
    qtask.put(task_dict)
1✔
624
    qtask.put({"shutdown": True, "wait": True})
1✔
625
    if "cores" not in resource_dict.keys() or (
1✔
626
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
627
    ):
628
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
629
    active_task_dict = _wait_for_free_slots(
1✔
630
        active_task_dict=active_task_dict,
631
        cores_requested=resource_dict["cores"],
632
        max_cores=max_cores,
633
    )
634
    active_task_dict[task_dict["future"]] = resource_dict["cores"]
1✔
635
    task_kwargs = executor_kwargs.copy()
1✔
636
    task_kwargs.update(resource_dict)
1✔
637
    task_kwargs.update(
1✔
638
        {
639
            "future_queue": qtask,
640
            "spawner": spawner,
641
            "hostname_localhost": hostname_localhost,
642
            "init_function": None,
643
        }
644
    )
645
    process = RaisingThread(
1✔
646
        target=execute_parallel_tasks,
647
        kwargs=task_kwargs,
648
    )
649
    process.start()
1✔
650
    return process, active_task_dict
1✔
651

652

653
def _execute_task(
1✔
654
    interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
655
):
656
    """
657
    Execute the task in the task_dict by communicating it via the interface.
658

659
    Args:
660
        interface (SocketInterface): socket interface for zmq communication
661
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
662
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
663
        future_queue (Queue): Queue for receiving new tasks.
664
    """
665
    f = task_dict.pop("future")
1✔
666
    if f.set_running_or_notify_cancel():
1✔
667
        try:
1✔
668
            f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
669
        except Exception as thread_exception:
1✔
670
            interface.shutdown(wait=True)
1✔
671
            future_queue.task_done()
1✔
672
            f.set_exception(exception=thread_exception)
1✔
673
            raise thread_exception
1✔
674
        else:
675
            future_queue.task_done()
1✔
676

677

678
def _execute_task_with_cache(
1✔
679
    interface: SocketInterface,
680
    task_dict: dict,
681
    future_queue: queue.Queue,
682
    cache_directory: str,
683
):
684
    """
685
    Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.
686

687
    Args:
688
        interface (SocketInterface): socket interface for zmq communication
689
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
690
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
691
        future_queue (Queue): Queue for receiving new tasks.
692
        cache_directory (str): The directory to store cache files.
693
    """
694
    from executorlib.shared.hdf import dump, get_output
1✔
695

696
    task_key, data_dict = serialize_funct_h5(
1✔
697
        task_dict["fn"], *task_dict["args"], **task_dict["kwargs"]
698
    )
699
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
700
    if not os.path.exists(cache_directory):
1✔
701
        os.mkdir(cache_directory)
1✔
702
    future = task_dict["future"]
1✔
703
    if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
704
        _execute_task(
1✔
705
            interface=interface,
706
            task_dict=task_dict,
707
            future_queue=future_queue,
708
        )
709
        data_dict["output"] = future.result()
1✔
710
        dump(file_name=file_name, data_dict=data_dict)
1✔
711
    else:
712
        _, result = get_output(file_name=file_name)
1✔
713
        future = task_dict["future"]
1✔
714
        future.set_result(result)
1✔
715
        future_queue.task_done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc