• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pympipool / 9890133465

11 Jul 2024 10:52AM UTC coverage: 93.776%. Remained the same
9890133465

push

github

web-flow
Merge pull request #370 from pyiron/dependabot/pip/matplotlib-3.9.1

Bump matplotlib from 3.9.0 to 3.9.1

889 of 948 relevant lines covered (93.78%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.22
/pympipool/shared/executor.py
1
import importlib.util
1✔
2
import inspect
1✔
3
import os
1✔
4
import queue
1✔
5
import sys
1✔
6
from concurrent.futures import (
1✔
7
    Executor as FutureExecutor,
8
)
9
from concurrent.futures import (
1✔
10
    Future,
11
)
12
from time import sleep
1✔
13
from typing import List, Optional
1✔
14

15
import cloudpickle
1✔
16

17
from pympipool.shared.communication import interface_bootup
1✔
18
from pympipool.shared.inputcheck import (
1✔
19
    check_resource_dict,
20
    check_resource_dict_is_empty,
21
)
22
from pympipool.shared.interface import BaseInterface, MpiExecInterface
1✔
23
from pympipool.shared.thread import RaisingThread
1✔
24

25

26
class ExecutorBase(FutureExecutor):
1✔
27
    def __init__(self):
1✔
28
        cloudpickle_register(ind=3)
1✔
29
        self._future_queue = queue.Queue()
1✔
30
        self._process = None
1✔
31

32
    @property
1✔
33
    def info(self):
1✔
34
        if self._process is not None and isinstance(self._process, list):
1✔
35
            meta_data_dict = self._process[0]._kwargs.copy()
1✔
36
            if "future_queue" in meta_data_dict.keys():
1✔
37
                del meta_data_dict["future_queue"]
1✔
38
            meta_data_dict["max_workers"] = len(self._process)
1✔
39
            return meta_data_dict
1✔
40
        elif self._process is not None:
1✔
41
            meta_data_dict = self._process._kwargs.copy()
1✔
42
            if "future_queue" in meta_data_dict.keys():
1✔
43
                del meta_data_dict["future_queue"]
1✔
44
            return meta_data_dict
1✔
45
        else:
46
            return None
1✔
47

48
    @property
1✔
49
    def future_queue(self):
1✔
50
        return self._future_queue
×
51

52
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
1✔
53
        """
54
        Submits a callable to be executed with the given arguments.
55

56
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
57
        a Future instance representing the execution of the callable.
58

59
        Args:
60
            fn (callable): function to submit for execution
61
            args: arguments for the submitted function
62
            kwargs: keyword arguments for the submitted function
63
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
64
                                  function. Example resource dictionary: {
65
                                      cores: 1,
66
                                      threads_per_core: 1,
67
                                      gpus_per_worker: 0,
68
                                      oversubscribe: False,
69
                                      cwd: None,
70
                                      executor: None,
71
                                      hostname_localhost: False,
72
                                  }
73

74
        Returns:
75
            A Future representing the given call.
76
        """
77
        check_resource_dict_is_empty(resource_dict=resource_dict)
1✔
78
        check_resource_dict(function=fn)
1✔
79
        f = Future()
1✔
80
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
1✔
81
        return f
1✔
82

83
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
84
        """
85
        Clean-up the resources associated with the Executor.
86

87
        It is safe to call this method several times. Otherwise, no other
88
        methods can be called after this one.
89

90
        Args:
91
            wait: If True then shutdown will not return until all running
92
                futures have finished executing and the resources used by the
93
                parallel_executors have been reclaimed.
94
            cancel_futures: If True then shutdown will cancel all pending
95
                futures. Futures that are completed or running will not be
96
                cancelled.
97
        """
98
        if cancel_futures:
1✔
99
            cancel_items_in_queue(que=self._future_queue)
×
100
        self._future_queue.put({"shutdown": True, "wait": wait})
1✔
101
        if wait and self._process is not None:
1✔
102
            self._process.join()
1✔
103
            self._future_queue.join()
1✔
104
        self._process = None
1✔
105
        self._future_queue = None
1✔
106

107
    def _set_process(self, process: RaisingThread):
1✔
108
        self._process = process
1✔
109
        self._process.start()
1✔
110

111
    def __len__(self):
1✔
112
        return self._future_queue.qsize()
1✔
113

114
    def __del__(self):
1✔
115
        try:
1✔
116
            self.shutdown(wait=False)
1✔
117
        except (AttributeError, RuntimeError):
1✔
118
            pass
1✔
119

120

121
class ExecutorBroker(ExecutorBase):
1✔
122
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
123
        """Clean-up the resources associated with the Executor.
124

125
        It is safe to call this method several times. Otherwise, no other
126
        methods can be called after this one.
127

128
        Args:
129
            wait: If True then shutdown will not return until all running
130
                futures have finished executing and the resources used by the
131
                parallel_executors have been reclaimed.
132
            cancel_futures: If True then shutdown will cancel all pending
133
                futures. Futures that are completed or running will not be
134
                cancelled.
135
        """
136
        if cancel_futures:
1✔
137
            cancel_items_in_queue(que=self._future_queue)
1✔
138
        if self._process is not None:
1✔
139
            for _ in range(len(self._process)):
1✔
140
                self._future_queue.put({"shutdown": True, "wait": wait})
1✔
141
            if wait:
1✔
142
                for process in self._process:
1✔
143
                    process.join()
1✔
144
                self._future_queue.join()
1✔
145
        self._process = None
1✔
146
        self._future_queue = None
1✔
147

148
    def _set_process(self, process: List[RaisingThread]):
1✔
149
        self._process = process
1✔
150
        for process in self._process:
1✔
151
            process.start()
1✔
152

153

154
class ExecutorSteps(ExecutorBase):
1✔
155
    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
1✔
156
        """
157
        Submits a callable to be executed with the given arguments.
158

159
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
160
        a Future instance representing the execution of the callable.
161

162
        Args:
163
            fn (callable): function to submit for execution
164
            args: arguments for the submitted function
165
            kwargs: keyword arguments for the submitted function
166
            resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
167
                                  function. Example resource dictionary: {
168
                                      cores: 1,
169
                                      threads_per_core: 1,
170
                                      gpus_per_worker: 0,
171
                                      oversubscribe: False,
172
                                      cwd: None,
173
                                      executor: None,
174
                                      hostname_localhost: False,
175
                                  }
176

177
        Returns:
178
            A Future representing the given call.
179
        """
180
        check_resource_dict(function=fn)
1✔
181
        f = Future()
1✔
182
        self._future_queue.put(
1✔
183
            {
184
                "fn": fn,
185
                "args": args,
186
                "kwargs": kwargs,
187
                "future": f,
188
                "resource_dict": resource_dict,
189
            }
190
        )
191
        return f
1✔
192

193
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
194
        """Clean-up the resources associated with the Executor.
195

196
        It is safe to call this method several times. Otherwise, no other
197
        methods can be called after this one.
198

199
        Args:
200
            wait: If True then shutdown will not return until all running
201
                futures have finished executing and the resources used by the
202
                parallel_executors have been reclaimed.
203
            cancel_futures: If True then shutdown will cancel all pending
204
                futures. Futures that are completed or running will not be
205
                cancelled.
206
        """
207
        if cancel_futures:
1✔
208
            cancel_items_in_queue(que=self._future_queue)
×
209
        if self._process is not None:
1✔
210
            self._future_queue.put({"shutdown": True, "wait": wait})
1✔
211
            if wait:
1✔
212
                self._process.join()
1✔
213
                self._future_queue.join()
1✔
214
        self._process = None
1✔
215
        self._future_queue = None
1✔
216

217

218
def cancel_items_in_queue(que: queue.Queue):
1✔
219
    """
220
    Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future
221
    objects have to be cancelled when the executor shuts down.
222

223
    Args:
224
        que (queue.Queue): Queue with task objects which should be executed
225
    """
226
    while True:
1✔
227
        try:
1✔
228
            item = que.get_nowait()
1✔
229
            if isinstance(item, dict) and "future" in item.keys():
1✔
230
                item["future"].cancel()
1✔
231
                que.task_done()
1✔
232
        except queue.Empty:
1✔
233
            break
1✔
234

235

236
def cloudpickle_register(ind: int = 2):
1✔
237
    """
238
    Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to
239
    be pickled by value rather than by reference, so the module which calls the map function is pickled by value.
240
    https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs
241
    inspect can help to find the module which is calling pympipool
242
    https://docs.python.org/3/library/inspect.html
243
    to learn more about inspect another good read is:
244
    http://pymotw.com/2/inspect/index.html#module-inspect
245
    1 refers to 1 level higher than the map function
246

247
    Args:
248
        ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used
249
    """
250
    try:  # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it.
1✔
251
        cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0]))
1✔
252
    except IndexError:
×
253
        cloudpickle_register(ind=ind - 1)
×
254
    except ValueError:
×
255
        pass
×
256

257

258
def execute_parallel_tasks(
1✔
259
    future_queue: queue.Queue,
260
    cores: int = 1,
261
    interface_class: BaseInterface = MpiExecInterface,
262
    hostname_localhost: bool = False,
263
    init_function: Optional[callable] = None,
264
    prefix_name: Optional[str] = None,
265
    prefix_path: Optional[str] = None,
266
    **kwargs,
267
):
268
    """
269
    Execute a single tasks in parallel using the message passing interface (MPI).
270

271
    Args:
272
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
273
       cores (int): defines the total number of MPI ranks to use
274
       interface_class (BaseInterface): Interface to start process on selected compute resources
275
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
276
                                     context of an HPC cluster this essential to be able to communicate to an
277
                                     Executor running on a different compute node within the same allocation. And
278
                                     in principle any computer should be able to resolve that their own hostname
279
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
280
                                     this look up for security reasons. So on MacOS it is required to set this
281
                                     option to true
282
       init_function (callable): optional function to preset arguments for functions which are submitted later
283
       prefix_name (str): name of the conda environment to initialize
284
       prefix_path (str): path of the conda environment to initialize
285
    """
286
    interface = interface_bootup(
1✔
287
        command_lst=_get_backend_path(
288
            cores=cores,
289
        ),
290
        connections=interface_class(cores=cores, **kwargs),
291
        hostname_localhost=hostname_localhost,
292
        prefix_path=prefix_path,
293
        prefix_name=prefix_name,
294
    )
295
    if init_function is not None:
1✔
296
        interface.send_dict(
1✔
297
            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
298
        )
299
    while True:
1✔
300
        task_dict = future_queue.get()
1✔
301
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
302
            interface.shutdown(wait=task_dict["wait"])
1✔
303
            future_queue.task_done()
1✔
304
            future_queue.join()
1✔
305
            break
1✔
306
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
307
            f = task_dict.pop("future")
1✔
308
            if f.set_running_or_notify_cancel():
1✔
309
                try:
1✔
310
                    f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
1✔
311
                except Exception as thread_exception:
1✔
312
                    interface.shutdown(wait=True)
1✔
313
                    future_queue.task_done()
1✔
314
                    f.set_exception(exception=thread_exception)
1✔
315
                    raise thread_exception
1✔
316
                else:
317
                    future_queue.task_done()
1✔
318

319

320
def execute_separate_tasks(
1✔
321
    future_queue: queue.Queue,
322
    interface_class: BaseInterface = MpiExecInterface,
323
    max_cores: int = 1,
324
    hostname_localhost: bool = False,
325
    **kwargs,
326
):
327
    """
328
    Execute a single tasks in parallel using the message passing interface (MPI).
329

330
    Args:
331
       future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
332
       interface_class (BaseInterface): Interface to start process on selected compute resources
333
       max_cores (int): defines the number cores which can be used in parallel
334
       hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
335
                                     context of an HPC cluster this essential to be able to communicate to an
336
                                     Executor running on a different compute node within the same allocation. And
337
                                     in principle any computer should be able to resolve that their own hostname
338
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
339
                                     this look up for security reasons. So on MacOS it is required to set this
340
                                     option to true
341
    """
342
    active_task_dict = {}
1✔
343
    process_lst, qtask_lst = [], []
1✔
344
    if "cores" not in kwargs.keys():
1✔
345
        kwargs["cores"] = 1
1✔
346
    while True:
1✔
347
        task_dict = future_queue.get()
1✔
348
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
349
            if task_dict["wait"]:
1✔
350
                _ = [process.join() for process in process_lst]
1✔
351
            future_queue.task_done()
1✔
352
            future_queue.join()
1✔
353
            break
1✔
354
        elif "fn" in task_dict.keys() and "future" in task_dict.keys():
1✔
355
            qtask = queue.Queue()
1✔
356
            process, active_task_dict = _submit_function_to_separate_process(
1✔
357
                task_dict=task_dict,
358
                qtask=qtask,
359
                active_task_dict=active_task_dict,
360
                interface_class=interface_class,
361
                executor_kwargs=kwargs,
362
                max_cores=max_cores,
363
                hostname_localhost=hostname_localhost,
364
            )
365
            qtask_lst.append(qtask)
1✔
366
            process_lst.append(process)
1✔
367
            future_queue.task_done()
1✔
368

369

370
def execute_tasks_with_dependencies(
1✔
371
    future_queue: queue.Queue,
372
    executor_queue: queue.Queue,
373
    executor: ExecutorBase,
374
    refresh_rate: float = 0.01,
375
):
376
    """
377
    Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
378
    other tasks.
379

380
    Args:
381
        future_queue (Queue): Queue for receiving new tasks.
382
        executor_queue (Queue): Queue for the internal executor.
383
        executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
384
        refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
385
    """
386
    wait_lst = []
1✔
387
    while True:
1✔
388
        try:
1✔
389
            task_dict = future_queue.get_nowait()
1✔
390
        except queue.Empty:
1✔
391
            task_dict = None
1✔
392
        if (  # shutdown the executor
1✔
393
            task_dict is not None
394
            and "shutdown" in task_dict.keys()
395
            and task_dict["shutdown"]
396
        ):
397
            executor.shutdown(wait=task_dict["wait"])
1✔
398
            future_queue.task_done()
1✔
399
            future_queue.join()
1✔
400
            break
1✔
401
        elif (  # handle function submitted to the executor
1✔
402
            task_dict is not None
403
            and "fn" in task_dict.keys()
404
            and "future" in task_dict.keys()
405
        ):
406
            future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
1✔
407
            if len(future_lst) == 0 or ready_flag:
1✔
408
                # No future objects are used in the input or all future objects are already done
409
                task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
1✔
410
                    args=task_dict["args"], kwargs=task_dict["kwargs"]
411
                )
412
                executor_queue.put(task_dict)
1✔
413
            else:  # Otherwise add the function to the wait list
414
                task_dict["future_lst"] = future_lst
1✔
415
                wait_lst.append(task_dict)
1✔
416
            future_queue.task_done()
1✔
417
        elif len(wait_lst) > 0:
1✔
418
            number_waiting = len(wait_lst)
1✔
419
            # Check functions in the wait list and execute them if all future objects are now ready
420
            wait_lst = _submit_waiting_task(
1✔
421
                wait_lst=wait_lst, executor_queue=executor_queue
422
            )
423
            # if no job is ready, sleep for a moment
424
            if len(wait_lst) == number_waiting:
1✔
425
                sleep(refresh_rate)
1✔
426
        else:
427
            # If there is nothing else to do, sleep for a moment
428
            sleep(refresh_rate)
1✔
429

430

431
def get_command_path(executable: str) -> str:
1✔
432
    """
433
    Get path of the backend executable script
434

435
    Args:
436
        executable (str): Name of the backend executable script, either mpiexec.py or serial.py
437

438
    Returns:
439
        str: absolute path to the executable script
440
    """
441
    return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
1✔
442

443

444
def _get_backend_path(
1✔
445
    cores: int,
446
) -> list:
447
    """
448
    Get command to call backend as a list of two strings
449

450
    Args:
451
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
452

453
    Returns:
454
        list[str]: List of strings containing the python executable path and the backend script to execute
455
    """
456
    command_lst = [sys.executable]
1✔
457
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
458
        command_lst += [get_command_path(executable="interactive_parallel.py")]
1✔
459
    elif cores > 1:
1✔
460
        raise ImportError(
×
461
            "mpi4py is required for parallel calculations. Please install mpi4py."
462
        )
463
    else:
464
        command_lst += [get_command_path(executable="interactive_serial.py")]
1✔
465
    return command_lst
1✔
466

467

468
def _get_command_path(executable: str) -> str:
1✔
469
    """
470
    Get path of the backend executable script
471

472
    Args:
473
        executable (str): Name of the backend executable script, either interactive_parallel.py or interactive_serial.py
474

475
    Returns:
476
        str: absolute path to the executable script
477
    """
478
    return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
×
479

480

481
def _wait_for_free_slots(
1✔
482
    active_task_dict: dict, cores_requested: int, max_cores: int
483
) -> dict:
484
    """
485
    Wait for available computing resources to become available.
486

487
    Args:
488
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
489
        cores_requested (int): Number of cores required for executing the next task
490
        max_cores (int): Maximum number cores which can be used
491

492
    Returns:
493
        dict: Dictionary containing the future objects and the number of cores they require
494
    """
495
    while sum(active_task_dict.values()) + cores_requested > max_cores:
1✔
496
        active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
1✔
497
    return active_task_dict
1✔
498

499

500
def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> list:
1✔
501
    """
502
    Submit the waiting tasks, which future inputs have been completed, to the executor
503

504
    Args:
505
        wait_lst (list): List of waiting tasks
506
        executor_queue (Queue): Queue of the internal executor
507

508
    Returns:
509
        list: list tasks which future inputs have not been completed
510
    """
511
    wait_tmp_lst = []
1✔
512
    for task_wait_dict in wait_lst:
1✔
513
        if all([future.done() for future in task_wait_dict["future_lst"]]):
1✔
514
            del task_wait_dict["future_lst"]
1✔
515
            task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
1✔
516
                args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
517
            )
518
            executor_queue.put(task_wait_dict)
1✔
519
        else:
520
            wait_tmp_lst.append(task_wait_dict)
1✔
521
    return wait_tmp_lst
1✔
522

523

524
def _update_futures_in_input(args: tuple, kwargs: dict):
1✔
525
    """
526
    Evaluate future objects in the arguments and keyword arguments by calling future.result()
527

528
    Args:
529
        args (tuple): function arguments
530
        kwargs (dict): function keyword arguments
531

532
    Returns:
533
        tuple, dict: arguments and keyword arguments with each future object in them being evaluated
534
    """
535

536
    def get_result(arg):
1✔
537
        if isinstance(arg, Future):
1✔
538
            return arg.result()
1✔
539
        elif isinstance(arg, list):
1✔
540
            return [get_result(arg=el) for el in arg]
1✔
541
        else:
542
            return arg
1✔
543

544
    args = [get_result(arg=arg) for arg in args]
1✔
545
    kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
1✔
546
    return args, kwargs
1✔
547

548

549
def _get_future_objects_from_input(task_dict: dict):
1✔
550
    """
551
    Check the input parameters if they contain future objects and which of these future objects are executed
552

553
    Args:
554
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
555
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
556

557
    Returns:
558
        list, boolean: list of future objects and boolean flag if all future objects are already done
559
    """
560
    future_lst = []
1✔
561

562
    def find_future_in_list(lst):
1✔
563
        for el in lst:
1✔
564
            if isinstance(el, Future):
1✔
565
                future_lst.append(el)
1✔
566
            elif isinstance(el, list):
1✔
567
                find_future_in_list(lst=el)
1✔
568

569
    find_future_in_list(lst=task_dict["args"])
1✔
570
    find_future_in_list(lst=task_dict["kwargs"].values())
1✔
571
    boolean_flag = len([future for future in future_lst if future.done()]) == len(
1✔
572
        future_lst
573
    )
574
    return future_lst, boolean_flag
1✔
575

576

577
def _submit_function_to_separate_process(
1✔
578
    task_dict: dict,
579
    active_task_dict: dict,
580
    qtask: queue.Queue,
581
    interface_class: BaseInterface,
582
    executor_kwargs: dict,
583
    max_cores: int = 1,
584
    hostname_localhost: bool = False,
585
):
586
    """
587
    Submit function to be executed in separate Python process
588
    Args:
589
        task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
590
                          {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
591
        active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
592
        qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
593
        interface_class (BaseInterface): Interface to start process on selected compute resources
594
        executor_kwargs (dict): keyword parameters used to initialize the Executor
595
        max_cores (int): defines the number cores which can be used in parallel
596
        hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
597
                                     context of an HPC cluster this essential to be able to communicate to an
598
                                     Executor running on a different compute node within the same allocation. And
599
                                     in principle any computer should be able to resolve that their own hostname
600
                                     points to the same address as localhost. Still MacOS >= 12 seems to disable
601
                                     this look up for security reasons. So on MacOS it is required to set this
602
                                     option to true
603
    Returns:
604
        RaisingThread, dict: thread for communicating with the python process which is executing the function and
605
                             dictionary containing the future objects and the number of cores they require
606
    """
607
    resource_dict = task_dict.pop("resource_dict")
1✔
608
    qtask.put(task_dict)
1✔
609
    qtask.put({"shutdown": True, "wait": True})
1✔
610
    if "cores" not in resource_dict.keys() or (
1✔
611
        resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
612
    ):
613
        resource_dict["cores"] = executor_kwargs["cores"]
1✔
614
    active_task_dict = _wait_for_free_slots(
1✔
615
        active_task_dict=active_task_dict,
616
        cores_requested=resource_dict["cores"],
617
        max_cores=max_cores,
618
    )
619
    active_task_dict[task_dict["future"]] = resource_dict["cores"]
1✔
620
    task_kwargs = executor_kwargs.copy()
1✔
621
    task_kwargs.update(resource_dict)
1✔
622
    task_kwargs.update(
1✔
623
        {
624
            "future_queue": qtask,
625
            "interface_class": interface_class,
626
            "hostname_localhost": hostname_localhost,
627
            "init_function": None,
628
        }
629
    )
630
    process = RaisingThread(
1✔
631
        target=execute_parallel_tasks,
632
        kwargs=task_kwargs,
633
    )
634
    process.start()
1✔
635
    return process, active_task_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc