• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 10230612576

03 Aug 2024 08:06PM UTC coverage: 94.363% (+0.006%) from 94.357%
10230612576

Pull #385

github

web-flow
Merge e53f22ece into 4255d1c26
Pull Request #385: Add docstrings and typehints with Github copilot

42 of 43 new or added lines in 11 files covered. (97.67%)

4 existing lines in 2 files now uncovered.

904 of 958 relevant lines covered (94.36%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.22
/executorlib/shared/executor.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import queue
1✔
5
import sys
1✔
6
from concurrent.futures import (
1✔
7
    Executor as FutureExecutor,
8
)
9
from concurrent.futures import (
1✔
10
    Future,
11
)
12
from time import sleep
1✔
13
from typing import Callable, List, Optional
1✔
14

15
import cloudpickle
1✔
16

17
from executorlib.shared.communication import interface_bootup
1✔
18
from executorlib.shared.inputcheck import (
1✔
19
    check_resource_dict,
20
    check_resource_dict_is_empty,
21
)
22
from executorlib.shared.interface import BaseInterface, MpiExecInterface
1✔
23
from executorlib.shared.thread import RaisingThread
1✔
24

25

26
class ExecutorBase(FutureExecutor):
1✔
27
    """
28
    Base class for the executor.
29

30
    Args:
31
        FutureExecutor: Base class for the executor.
32
    """
33

34
    def __init__(self):
1✔
35
        """
36
        Initialize the ExecutorBase class.
37
        """
38
        cloudpickle_register(ind=3)
1✔
39
        self._future_queue: queue.Queue = queue.Queue()
1✔
40
        self._process: Optional[RaisingThread] = None
1✔
41

42
    @property
1✔
43
    def info(self) -> Optional[dict]:
1✔
44
        """
45
        Get the information about the executor.
46

47
        Returns:
48
            Optional[dict]: Information about the executor.
49
        """
50
        if self._process is not None and isinstance(self._process, list):
1✔
51
            meta_data_dict = self._process[0]._kwargs.copy()
1✔
52
            if "future_queue" in meta_data_dict.keys():
1✔
53
                del meta_data_dict["future_queue"]
1✔
54
            meta_data_dict["max_workers"] = len(self._process)
1✔
55
            return meta_data_dict
1✔
56
        elif self._process is not None:
1✔
57
            meta_data_dict = self._process._kwargs.copy()
1✔
58
            if "future_queue" in meta_data_dict.keys():
1✔
59
                del meta_data_dict["future_queue"]
1✔
60
            return meta_data_dict
1✔
61
        else:
62
            return None
1✔
63

64
    @property
1✔
65
    def future_queue(self) -> queue.Queue:
1✔
66
        """
67
        Get the future queue.
68

69
        Returns:
70
            queue.Queue: The future queue.
71
        """
UNCOV
72
        return self._future_queue
×
73

74
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
1✔
75
        """
76
        Submits a callable to be executed with the given arguments.
77

78
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
79
        a Future instance representing the execution of the callable.
80

81
        Args:
82
            fn (callable): function to submit for execution
83
            args: arguments for the submitted function
84
            kwargs: keyword arguments for the submitted function
85
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
86
                                  function. Example resource dictionary: {
87
                                      cores: 1,
88
                                      threads_per_core: 1,
89
                                      gpus_per_worker: 0,
90
                                      oversubscribe: False,
91
                                      cwd: None,
92
                                      executor: None,
93
                                      hostname_localhost: False,
94
                                  }
95

96
        Returns:
97
            Future: A Future representing the given call.
98
        """
99
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
100
        check_resource_dict(function=fn)
1✔
101
        f = Future()
1✔
102
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
103
        return f
1✔
104

105
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
106
        """
107
        Clean-up the resources associated with the Executor.
108

109
        It is safe to call this method several times. Otherwise, no other
110
        methods can be called after this one.
111

112
        Args:
113
            wait (bool): If True then shutdown will not return until all running
114
                futures have finished executing and the resources used by the
115
                parallel_executors have been reclaimed.
116
            cancel_futures (bool): If True then shutdown will cancel all pending
117
                futures. Futures that are completed or running will not be
118
                cancelled.
119
        """
120
        if cancel_futures:
1✔
121
            cancel_items_in_queue(que=self._future_queue)
×
122
        self._future_queue.put({"shutdown": True, "wait": wait})
1✔
123
        if wait and self._process is not None:
1✔
124
            self._process.join()
1✔
125
            self._future_queue.join()
1✔
126
        self._process = None
1✔
127
        self._future_queue = None
1✔
128

129
    def _set_process(self, process: RaisingThread):
1✔
130
        """
131
        Set the process for the executor.
132

133
        Args:
134
            process (RaisingThread): The process for the executor.
135
        """
136
        self._process = process
1✔
137
        self._process.start()
1✔
138

139
    def __len__(self) -> int:
1✔
140
        """
141
        Get the length of the executor.
142

143
        Returns:
144
            int: The length of the executor.
145
        """
146
        return self._future_queue.qsize()
1✔
147

148
    def __del__(self):
1✔
149
        """
150
        Clean-up the resources associated with the Executor.
151
        """
152
        try:
1✔
153
            self.shutdown(wait=False)
1✔
154
        except (AttributeError, RuntimeError):
1✔
155
            pass
1✔
156

157

158
class ExecutorBroker(ExecutorBase):
1✔
159
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
160
        """Clean-up the resources associated with the Executor.
161

162
        It is safe to call this method several times. Otherwise, no other
163
        methods can be called after this one.
164

165
        Args:
166
            wait: If True then shutdown will not return until all running
167
                futures have finished executing and the resources used by the
168
                parallel_executors have been reclaimed.
169
            cancel_futures: If True then shutdown will cancel all pending
170
                futures. Futures that are completed or running will not be
171
                cancelled.
172
        """
173
        if cancel_futures:
1✔
174
            cancel_items_in_queue(que=self._future_queue)
1✔
175
        if self._process is not None:
1✔
176
            for _ in range(len(self._process)):
1✔
177
                self._future_queue.put({"shutdown": True, "wait": wait})
1✔
178
            if wait:
1✔
179
                for process in self._process:
1✔
180
                    process.join()
1✔
181
                self._future_queue.join()
1✔
182
        self._process = None
1✔
183
        self._future_queue = None
1✔
184

185
    def _set_process(self, process: List[RaisingThread]):
1✔
186
        """
187
        Set the process for the executor.
188

189
        Args:
190
            process (List[RaisingThread]): The process for the executor.
191
        """
192
        self._process = process
1✔
193
        for process in self._process:
1✔
194
            process.start()
1✔
195

196

197
class ExecutorSteps(ExecutorBase):
1✔
198
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
1✔
199
        """
200
        Submits a callable to be executed with the given arguments.
201

202
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
203
        a Future instance representing the execution of the callable.
204

205
        Args:
206
            fn (callable): function to submit for execution
207
            args: arguments for the submitted function
208
            kwargs: keyword arguments for the submitted function
209
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
210
                                  function. Example resource dictionary: {
211
                                      cores: 1,
212
                                      threads_per_core: 1,
213
                                      gpus_per_worker: 0,
214
                                      oversubscribe: False,
215
                                      cwd: None,
216
                                      executor: None,
217
                                      hostname_localhost: False,
218
                                  }
219

220
        Returns:
221
            A Future representing the given call.
222
        """
223
        check_resource_dict(function=fn)
1✔
224
        f = Future()
1✔
225
        self._future_queue.put(
1✔
226
            {
227
                "fn": fn,
228
                "args": args,
229
                "kwargs": kwargs,
230
                "future": f,
231
                "resource_dict": resource_dict,
232
            }
233
        )
234
        return f
1✔
235

236
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
237
        """Clean-up the resources associated with the Executor.
238

239
        It is safe to call this method several times. Otherwise, no other
240
        methods can be called after this one.
241

242
        Args:
243
            wait: If True then shutdown will not return until all running
244
                futures have finished executing and the resources used by the
245
                parallel_executors have been reclaimed.
246
            cancel_futures: If True then shutdown will cancel all pending
247
                futures. Futures that are completed or running will not be
248
                cancelled.
249
        """
250
        if cancel_futures:
1✔
251
            cancel_items_in_queue(que=self._future_queue)
×
252
        if self._process is not None:
1✔
253
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
254
            if wait:
1✔
255
                self._process.join()
1✔
256
                self._future_queue.join()
1✔
257
        self._process = None
1✔
258
        self._future_queue = None
1✔
259

260

261
def cancel_items_in_queue(que: queue.Queue):
1✔
262
    """
263
    Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future
264
    objects have to be cancelled when the executor shuts down.
265

266
    Args:
267
        que (queue.Queue): Queue with task objects which should be executed
268
    """
269
    while True:
1✔
270
        try:
1✔
271
            item = que.get_nowait()
1✔
272
            if isinstance(item, dict) and "future" in item.keys():
1✔
273
                item["future"].cancel()
1✔
274
                que.task_done()
1✔
275
        except queue.Empty:
1✔
276
            break
1✔
277

278

279
def cloudpickle_register(ind: int = 2):
1✔
280
    """
281
    Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to
282
    be pickled by value rather than by reference, so the module which calls the map function is pickled by value.
283
    https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs
284
    inspect can help to find the module which is calling executorlib
285
    https://docs.python.org/3/library/inspect.html
286
    to learn more about inspect another good read is:
287
    http://pymotw.com/2/inspect/index.html#module-inspect
288
    1 refers to 1 level higher than the map function
289

290
    Args:
291
        ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used
292
    """
293
    try:  # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it.
1✔
294
        cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0]))
1✔
295
    except IndexError:
×
296
        cloudpickle_register(ind=ind - 1)
×
297
    except ValueError:
×
298
        pass
×
299

300

301
def execute_parallel_tasks(
1✔
302
    future_queue: queue.Queue,
303
    cores: int = 1,
304
    interface_class: BaseInterface = MpiExecInterface,
305
    hostname_localhost: bool = False,
306
    init_function: Optional[Callable] = None,
307
    prefix_name: Optional[str] = None,
308
    prefix_path: Optional[str] = None,
309
    **kwargs,
310
) -> None:
311
    """
312
    Execute a single tasks in parallel using the message passing interface (MPI).
313

314
    Args:
315
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
316
       cores (int): defines the total number of MPI ranks to use
317
       interface_class (BaseInterface): Interface to start process on selected compute resources
318
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
319
                                     context of an HPC cluster this essential to be able to communicate to an
320
                                     Executor running on a different compute node within the same allocation. And
321
                                     in principle any computer should be able to resolve that their own hostname
322
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
323
                                     this look up for security reasons. So on MacOS it is required to set this
324
                                     option to true
325
       init_function (callable): optional function to preset arguments for functions which are submitted later
326
       prefix_name (str): name of the conda environment to initialize
327
       prefix_path (str): path of the conda environment to initialize
328
    """
329
    interface = interface_bootup(
1✔
330
        command_lst=_get_backend_path(
331
            cores=cores,
332
        ),
333
        connections=interface_class(cores=cores, **kwargs),
334
        hostname_localhost=hostname_localhost,
335
        prefix_path=prefix_path,
336
        prefix_name=prefix_name,
337
    )
338
    if init_function is not None:
1✔
339
        interface.send_dict(
1✔
340
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
341
        )
342
    while True:
1✔
343
        task_dict = future_queue.get()
1✔
344
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
345
            interface.shutdown(wait=task_dict["wait"])
1✔
346
            future_queue.task_done()
1✔
347
            future_queue.join()
1✔
348
            break
1✔
349
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
350
            f = task_dict.pop("future")
1✔
351
            if f.set_running_or_notify_cancel():
1✔
352
                try:
1✔
353
                    f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
354
                except Exception as thread_exception:
1✔
355
                    interface.shutdown(wait=True)
1✔
356
                    future_queue.task_done()
1✔
357
                    f.set_exception(exception=thread_exception)
1✔
358
                    raise thread_exception
1✔
359
                else:
360
                    future_queue.task_done()
1✔
361

362

363
def execute_separate_tasks(
1✔
364
    future_queue: queue.Queue,
365
    interface_class: BaseInterface = MpiExecInterface,
366
    max_cores: int = 1,
367
    hostname_localhost: bool = False,
368
    **kwargs,
369
):
370
    """
371
    Execute a single tasks in parallel using the message passing interface (MPI).
372

373
    Args:
374
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
375
       interface_class (BaseInterface): Interface to start process on selected compute resources
376
       max_cores (int): defines the number cores which can be used in parallel
377
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
378
                                     context of an HPC cluster this essential to be able to communicate to an
379
                                     Executor running on a different compute node within the same allocation. And
380
                                     in principle any computer should be able to resolve that their own hostname
381
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
382
                                     this look up for security reasons. So on MacOS it is required to set this
383
                                     option to true
384
    """
385
    active_task_dict = {}
1✔
386
    process_lst, qtask_lst = [], []
1✔
387
    if "cores" not in kwargs.keys():
1✔
388
        kwargs["cores"] = 1
1✔
389
    while True:
1✔
390
        task_dict = future_queue.get()
1✔
391
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
392
            if task_dict["wait"]:
1✔
393
                _ = [process.join() for process in process_lst]
1✔
394
            future_queue.task_done()
1✔
395
            future_queue.join()
1✔
396
            break
1✔
397
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
398
            qtask = queue.Queue()
1✔
399
            process, active_task_dict = _submit_function_to_separate_process(
1✔
400
                task_dict=task_dict,
401
                qtask=qtask,
402
                active_task_dict=active_task_dict,
403
                interface_class=interface_class,
404
                executor_kwargs=kwargs,
405
                max_cores=max_cores,
406
                hostname_localhost=hostname_localhost,
407
            )
408
            qtask_lst.append(qtask)
1✔
409
            process_lst.append(process)
1✔
410
            future_queue.task_done()
1✔
411

412

413
def execute_tasks_with_dependencies(
1✔
414
    future_queue: queue.Queue,
415
    executor_queue: queue.Queue,
416
    executor: ExecutorBase,
417
    refresh_rate: float = 0.01,
418
):
419
    """
420
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
421
    other tasks.
422

423
    Args:
424
        future_queue (Queue): Queue for receiving new tasks.
425
        executor_queue (Queue): Queue for the internal executor.
426
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
427
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
428
    """
429
    wait_lst = []
1✔
430
    while True:
1✔
431
        try:
1✔
432
            task_dict = future_queue.get_nowait()
1✔
433
        except queue.Empty:
1✔
434
            task_dict = None
1✔
435
        if (  # shutdown the executor
1✔
436
            task_dict is not None
437
            and "shutdown" in task_dict.keys()
438
            and task_dict["shutdown"]
439
        ):
440
            executor.shutdown(wait=task_dict["wait"])
1✔
441
            future_queue.task_done()
1✔
442
            future_queue.join()
1✔
443
            break
1✔
444
        elif (  # handle function submitted to the executor
1✔
445
            task_dict is not None
446
            and "fn" in task_dict.keys()
447
            and "future" in task_dict.keys()
448
        ):
449
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
450
            if len(future_lst) == 0 or ready_flag:
1✔
451
                # No future objects are used in the input or all future objects are already done
452
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
453
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
454
                )
455
                executor_queue.put(task_dict)
1✔
456
            else:  # Otherwise add the function to the wait list
457
                task_dict["future_lst"] = future_lst
1✔
458
                wait_lst.append(task_dict)
1✔
459
            future_queue.task_done()
1✔
460
        elif len(wait_lst) > 0:
1✔
461
            number_waiting = len(wait_lst)
1✔
462
            # Check functions in the wait list and execute them if all future objects are now ready
463
            wait_lst = _submit_waiting_task(
1✔
464
                wait_lst=wait_lst, executor_queue=executor_queue
465
            )
466
            # if no job is ready, sleep for a moment
467
            if len(wait_lst) == number_waiting:
1✔
468
                sleep(refresh_rate)
1✔
469
        else:
470
            # If there is nothing else to do, sleep for a moment
471
            sleep(refresh_rate)
1✔
472

473

474
def get_command_path(executable: str) -> str:
1✔
475
    """
476
    Get path of the backend executable script
477

478
    Args:
479
        executable (str): Name of the backend executable script, either mpiexec.py or serial.py
480

481
    Returns:
482
        str: absolute path to the executable script
483
    """
484
    return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
1✔
485

486

487
def _get_backend_path(
1✔
488
    cores: int,
489
) -> list:
490
    """
491
    Get command to call backend as a list of two strings
492

493
    Args:
494
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
495

496
    Returns:
497
        list[str]: List of strings containing the python executable path and the backend script to execute
498
    """
499
    command_lst = [sys.executable]
1✔
500
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
501
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
502
    elif cores > 1:
1✔
503
        raise ImportError(
×
504
            "mpi4py is required for parallel calculations. Please install mpi4py."
505
        )
506
    else:
507
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
508
    return command_lst
1✔
509

510

511
def _get_command_path(executable: str) -> str:
1✔
512
    """
513
    Get path of the backend executable script
514

515
    Args:
516
        executable (str): Name of the backend executable script, either interactive_parallel.py or interactive_serial.py
517

518
    Returns:
519
        str: absolute path to the executable script
520
    """
521
    return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
×
522

523

524
def _wait_for_free_slots(
1✔
525
    active_task_dict: dict, cores_requested: int, max_cores: int
526
) -> dict:
527
    """
528
    Wait for available computing resources to become available.
529

530
    Args:
531
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
532
        cores_requested (int): Number of cores required for executing the next task
533
        max_cores (int): Maximum number cores which can be used
534

535
    Returns:
536
        dict: Dictionary containing the future objects and the number of cores they require
537
    """
538
    while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
539
        active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
1✔
540
    return active_task_dict
1✔
541

542

543
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
544
    """
545
    Submit the waiting tasks, which future inputs have been completed, to the executor
546

547
    Args:
548
        wait_lst (list): List of waiting tasks
549
        executor_queue (Queue): Queue of the internal executor
550

551
    Returns:
552
        list: list tasks which future inputs have not been completed
553
    """
554
    wait_tmp_lst = []
1✔
555
    for task_wait_dict in wait_lst:
1✔
556
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
557
            del task_wait_dict["future_lst"]
1✔
558
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
559
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
560
            )
561
            executor_queue.put(task_wait_dict)
1✔
562
        else:
563
            wait_tmp_lst.append(task_wait_dict)
1✔
564
    return wait_tmp_lst
1✔
565

566

567
def _update_futures_in_input(args: tuple, kwargs: dict):
1✔
568
    """
569
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
570

571
    Args:
572
        args (tuple): function arguments
573
        kwargs (dict): function keyword arguments
574

575
    Returns:
576
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
577
    """
578

579
    def get_result(arg):
1✔
580
        if isinstance(arg, Future):
1✔
581
            return arg.result()
1✔
582
        elif isinstance(arg, list):
1✔
583
            return [get_result(arg=el) for el in arg]
1✔
584
        else:
585
            return arg
1✔
586

587
    args = [get_result(arg=arg) for arg in args]
1✔
588
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
589
    return args, kwargs
1✔
590

591

592
def _get_future_objects_from_input(task_dict: dict):
1✔
593
    """
594
    Check the input parameters if they contain future objects and which of these future objects are executed
595

596
    Args:
597
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
598
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
599

600
    Returns:
601
        list, boolean: list of future objects and boolean flag if all future objects are already done
602
    """
603
    future_lst = []
1✔
604

605
    def find_future_in_list(lst):
1✔
606
        for el in lst:
1✔
607
            if isinstance(el, Future):
1✔
608
                future_lst.append(el)
1✔
609
            elif isinstance(el, list):
1✔
610
                find_future_in_list(lst=el)
1✔
611

612
    find_future_in_list(lst=task_dict["args"])
1✔
613
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
614
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
615
        future_lst
616
    )
617
    return future_lst, boolean_flag
1✔
618

619

620
def _submit_function_to_separate_process(
1✔
621
    task_dict: dict,
622
    active_task_dict: dict,
623
    qtask: queue.Queue,
624
    interface_class: BaseInterface,
625
    executor_kwargs: dict,
626
    max_cores: int = 1,
627
    hostname_localhost: bool = False,
628
):
629
    """
630
    Submit function to be executed in separate Python process
631
    Args:
632
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
633
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
634
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
635
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
636
        interface_class (BaseInterface): Interface to start process on selected compute resources
637
        executor_kwargs (dict): keyword parameters used to initialize the Executor
638
        max_cores (int): defines the number cores which can be used in parallel
639
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
640
                                     context of an HPC cluster this essential to be able to communicate to an
641
                                     Executor running on a different compute node within the same allocation. And
642
                                     in principle any computer should be able to resolve that their own hostname
643
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
644
                                     this look up for security reasons. So on MacOS it is required to set this
645
                                     option to true
646
    Returns:
647
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
648
                             dictionary containing the future objects and the number of cores they require
649
    """
650
    resource_dict = task_dict.pop("resource_dict")
1✔
651
    qtask.put(task_dict)
1✔
652
    qtask.put({"shutdown": True, "wait": True})
1✔
653
    if "cores" not in resource_dict.keys() or (
1✔
654
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
655
    ):
656
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
657
    active_task_dict = _wait_for_free_slots(
1✔
658
        active_task_dict=active_task_dict,
659
        cores_requested=resource_dict["cores"],
660
        max_cores=max_cores,
661
    )
662
    active_task_dict[task_dict["future"]] = resource_dict["cores"]
1✔
663
    task_kwargs = executor_kwargs.copy()
1✔
664
    task_kwargs.update(resource_dict)
1✔
665
    task_kwargs.update(
1✔
666
        {
667
            "future_queue": qtask,
668
            "interface_class": interface_class,
669
            "hostname_localhost": hostname_localhost,
670
            "init_function": None,
671
        }
672
    )
673
    process = RaisingThread(
1✔
674
        target=execute_parallel_tasks,
675
        kwargs=task_kwargs,
676
    )
677
    process.start()
1✔
678
    return process, active_task_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc