• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13257409288

11 Feb 2025 06:51AM UTC coverage: 95.592% (-0.1%) from 95.735%
13257409288

Pull #565

github

web-flow
Merge ebd267ae8 into 659d0ded2
Pull Request #565: [feature] Add option to specify number of nodes

8 of 8 new or added lines in 2 files covered. (100.0%)

12 existing lines in 4 files now uncovered.

1106 of 1157 relevant lines covered (95.59%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/executorlib/interactive/flux.py
1
import os
1✔
2
from typing import Optional
1✔
3

4
import flux
1✔
5
import flux.job
1✔
6

7
from executorlib.standalone.interactive.spawner import BaseSpawner
1✔
8

9

10
def validate_max_workers(max_workers: int, cores: int, threads_per_core: int):
1✔
11
    handle = flux.Flux()
1✔
12
    cores_total = flux.resource.list.resource_list(handle).get().up.ncores
1✔
13
    cores_requested = max_workers * cores * threads_per_core
1✔
14
    if cores_total < cores_requested:
1✔
15
        raise ValueError(
1✔
16
            "The number of requested cores is larger than the available cores "
17
            + str(cores_total)
18
            + " < "
19
            + str(cores_requested)
20
        )
21

22

23
class FluxPythonSpawner(BaseSpawner):
1✔
24
    """
25
    A class representing the FluxPythonInterface.
26

27
    Args:
28
        cwd (str, optional): The current working directory. Defaults to None.
29
        cores (int, optional): The number of cores. Defaults to 1.
30
        threads_per_core (int, optional): The number of threads per base. Defaults to 1.
31
        gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
32
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
33
        flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
34
        flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
35
        flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
36
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
37
    """
38

39
    def __init__(
1✔
40
        self,
41
        cwd: Optional[str] = None,
42
        cores: int = 1,
43
        threads_per_core: int = 1,
44
        gpus_per_core: int = 0,
45
        num_nodes: Optional[int] = None,
46
        exclusive: bool = False,
47
        openmpi_oversubscribe: bool = False,
48
        flux_executor: Optional[flux.job.FluxExecutor] = None,
49
        flux_executor_pmi_mode: Optional[str] = None,
50
        flux_executor_nesting: bool = False,
51
        flux_log_files: bool = False,
52
    ):
53
        super().__init__(
1✔
54
            cwd=cwd,
55
            cores=cores,
56
            openmpi_oversubscribe=openmpi_oversubscribe,
57
        )
58
        self._threads_per_core = threads_per_core
1✔
59
        self._gpus_per_core = gpus_per_core
1✔
60
        self._num_nodes = num_nodes
1✔
61
        self._exclusive = exclusive
1✔
62
        self._flux_executor = flux_executor
1✔
63
        self._flux_executor_pmi_mode = flux_executor_pmi_mode
1✔
64
        self._flux_executor_nesting = flux_executor_nesting
1✔
65
        self._flux_log_files = flux_log_files
1✔
66
        self._future = None
1✔
67

68
    def bootup(
1✔
69
        self,
70
        command_lst: list[str],
71
    ):
72
        """
73
        Boot up the client process to connect to the SocketInterface.
74

75
        Args:
76
            command_lst (list[str]): List of strings to start the client process.
77
        Raises:
78
            ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported.
79
        """
80
        if self._openmpi_oversubscribe:
1✔
81
            raise ValueError(
1✔
82
                "Oversubscribing is currently not supported for the Flux adapter."
83
            )
84
        if self._flux_executor is None:
1✔
85
            self._flux_executor = flux.job.FluxExecutor()
1✔
86
        if not self._flux_executor_nesting:
1✔
87
            jobspec = flux.job.JobspecV1.from_command(
1✔
88
                command=command_lst,
89
                num_tasks=self._cores,
90
                cores_per_task=self._threads_per_core,
91
                gpus_per_task=self._gpus_per_core,
92
                num_nodes=self._num_nodes,
93
                exclusive=self._exclusive,
94
            )
95
        else:
UNCOV
96
            jobspec = flux.job.JobspecV1.from_nest_command(
×
97
                command=command_lst,
98
                num_slots=self._cores,
99
                cores_per_slot=self._threads_per_core,
100
                gpus_per_slot=self._gpus_per_core,
101
                num_nodes=self._num_nodes,
102
                exclusive=self._exclusive,
103
            )
104
        jobspec.environment = dict(os.environ)
1✔
105
        if self._flux_executor_pmi_mode is not None:
1✔
106
            jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
1✔
107
        if self._cwd is not None:
1✔
108
            jobspec.cwd = self._cwd
1✔
109
        if self._flux_log_files and self._cwd is not None:
1✔
110
            jobspec.stderr = os.path.join(self._cwd, "flux.err")
1✔
111
            jobspec.stdout = os.path.join(self._cwd, "flux.out")
1✔
112
        elif self._flux_log_files:
1✔
113
            jobspec.stderr = os.path.abspath("flux.err")
1✔
114
            jobspec.stdout = os.path.abspath("flux.out")
1✔
115
        self._future = self._flux_executor.submit(jobspec)
1✔
116

117
    def shutdown(self, wait: bool = True):
1✔
118
        """
119
        Shutdown the FluxPythonInterface.
120

121
        Args:
122
            wait (bool, optional): Whether to wait for the execution to complete. Defaults to True.
123
        """
124
        if self._future is not None:
1✔
125
            if self.poll():
1✔
126
                self._future.cancel()
1✔
127
            # The flux future objects are not instantly updated,
128
            # still showing running after cancel was called,
129
            # so we wait until the execution is completed.
130
            self._future.result()
1✔
131

132
    def poll(self):
1✔
133
        """
134
        Check if the FluxPythonInterface is running.
135

136
        Returns:
137
            bool: True if the interface is running, False otherwise.
138
        """
139
        return self._future is not None and not self._future.done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc