• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11907763573

19 Nov 2024 06:52AM UTC coverage: 95.648%. Remained the same
11907763573

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#502)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.7.3 → v0.7.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.7.3...v0.7.4)

* Update environment-openmpi.yml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>

945 of 988 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/executorlib/interactive/flux.py
1
import os
1✔
2
from typing import Optional
1✔
3

4
import flux.job
1✔
5

6
from executorlib.standalone.interactive.spawner import BaseSpawner
1✔
7

8

9
class FluxPythonSpawner(BaseSpawner):
1✔
10
    """
11
    A class representing the FluxPythonInterface.
12

13
    Args:
14
        cwd (str, optional): The current working directory. Defaults to None.
15
        cores (int, optional): The number of cores. Defaults to 1.
16
        threads_per_core (int, optional): The number of threads per base. Defaults to 1.
17
        gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
18
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
19
        flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
20
        flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
21
        flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
22
    """
23

24
    def __init__(
1✔
25
        self,
26
        cwd: Optional[str] = None,
27
        cores: int = 1,
28
        threads_per_core: int = 1,
29
        gpus_per_core: int = 0,
30
        openmpi_oversubscribe: bool = False,
31
        flux_executor: Optional[flux.job.FluxExecutor] = None,
32
        flux_executor_pmi_mode: Optional[str] = None,
33
        flux_executor_nesting: bool = False,
34
    ):
35
        super().__init__(
1✔
36
            cwd=cwd,
37
            cores=cores,
38
            openmpi_oversubscribe=openmpi_oversubscribe,
39
        )
40
        self._threads_per_core = threads_per_core
1✔
41
        self._gpus_per_core = gpus_per_core
1✔
42
        self._flux_executor = flux_executor
1✔
43
        self._flux_executor_pmi_mode = flux_executor_pmi_mode
1✔
44
        self._flux_executor_nesting = flux_executor_nesting
1✔
45
        self._future = None
1✔
46

47
    def bootup(
1✔
48
        self,
49
        command_lst: list[str],
50
    ):
51
        """
52
        Boot up the client process to connect to the SocketInterface.
53

54
        Args:
55
            command_lst (list[str]): List of strings to start the client process.
56
        Raises:
57
            ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported.
58
        """
59
        if self._openmpi_oversubscribe:
1✔
60
            raise ValueError(
1✔
61
                "Oversubscribing is currently not supported for the Flux adapter."
62
            )
63
        if self._flux_executor is None:
1✔
64
            self._flux_executor = flux.job.FluxExecutor()
×
65
        if not self._flux_executor_nesting:
1✔
66
            jobspec = flux.job.JobspecV1.from_command(
1✔
67
                command=command_lst,
68
                num_tasks=self._cores,
69
                cores_per_task=self._threads_per_core,
70
                gpus_per_task=self._gpus_per_core,
71
                num_nodes=None,
72
                exclusive=False,
73
            )
74
        else:
75
            jobspec = flux.job.JobspecV1.from_nest_command(
×
76
                command=command_lst,
77
                num_slots=self._cores,
78
                cores_per_slot=self._threads_per_core,
79
                gpus_per_slot=self._gpus_per_core,
80
                num_nodes=None,
81
                exclusive=False,
82
            )
83
        jobspec.environment = dict(os.environ)
1✔
84
        if self._flux_executor_pmi_mode is not None:
1✔
85
            jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
1✔
86
        if self._cwd is not None:
1✔
87
            jobspec.cwd = self._cwd
×
88
        self._future = self._flux_executor.submit(jobspec)
1✔
89

90
    def shutdown(self, wait: bool = True):
1✔
91
        """
92
        Shutdown the FluxPythonInterface.
93

94
        Args:
95
            wait (bool, optional): Whether to wait for the execution to complete. Defaults to True.
96
        """
97
        if self.poll():
1✔
98
            self._future.cancel()
1✔
99
        # The flux future objects are not instantly updated,
100
        # still showing running after cancel was called,
101
        # so we wait until the execution is completed.
102
        self._future.result()
1✔
103

104
    def poll(self):
1✔
105
        """
106
        Check if the FluxPythonInterface is running.
107

108
        Returns:
109
            bool: True if the interface is running, False otherwise.
110
        """
111
        return self._future is not None and not self._future.done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc