• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12426327282

20 Dec 2024 05:52AM UTC coverage: 95.849% (+0.1%) from 95.704%
12426327282

Pull #519

github

web-flow
Merge 482cb8fef into e9a81f822
Pull Request #519: Add option to write flux log files

5 of 6 new or added lines in 4 files covered. (83.33%)

15 existing lines in 4 files now uncovered.

993 of 1036 relevant lines covered (95.85%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/executorlib/interactive/flux.py
1
import os
1✔
2
from typing import Optional
1✔
3

4
import flux
1✔
5
import flux.job
1✔
6

7
from executorlib.standalone.interactive.spawner import BaseSpawner
1✔
8

9

10
def validate_max_workers(max_workers, cores, threads_per_core):
1✔
11
    handle = flux.Flux()
1✔
12
    cores_total = flux.resource.list.resource_list(handle).get().up.ncores
1✔
13
    cores_requested = max_workers * cores * threads_per_core
1✔
14
    if cores_total < cores_requested:
1✔
15
        raise ValueError(
1✔
16
            "The number of requested cores is larger than the available cores "
17
            + str(cores_total)
18
            + " < "
19
            + str(cores_requested)
20
        )
21

22

23
class FluxPythonSpawner(BaseSpawner):
1✔
24
    """
25
    A class representing the FluxPythonInterface.
26

27
    Args:
28
        cwd (str, optional): The current working directory. Defaults to None.
29
        cores (int, optional): The number of cores. Defaults to 1.
30
        threads_per_core (int, optional): The number of threads per base. Defaults to 1.
31
        gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
32
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
33
        flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
34
        flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
35
        flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
36
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
37
    """
38

39
    def __init__(
1✔
40
        self,
41
        cwd: Optional[str] = None,
42
        cores: int = 1,
43
        threads_per_core: int = 1,
44
        gpus_per_core: int = 0,
45
        openmpi_oversubscribe: bool = False,
46
        flux_executor: Optional[flux.job.FluxExecutor] = None,
47
        flux_executor_pmi_mode: Optional[str] = None,
48
        flux_executor_nesting: bool = False,
49
        flux_log_files: bool = False,
50
    ):
51
        super().__init__(
1✔
52
            cwd=cwd,
53
            cores=cores,
54
            openmpi_oversubscribe=openmpi_oversubscribe,
55
        )
56
        self._threads_per_core = threads_per_core
1✔
57
        self._gpus_per_core = gpus_per_core
1✔
58
        self._flux_executor = flux_executor
1✔
59
        self._flux_executor_pmi_mode = flux_executor_pmi_mode
1✔
60
        self._flux_executor_nesting = flux_executor_nesting
1✔
61
        self._flux_log_files = flux_log_files
1✔
62
        self._future = None
1✔
63

64
    def bootup(
1✔
65
        self,
66
        command_lst: list[str],
67
    ):
68
        """
69
        Boot up the client process to connect to the SocketInterface.
70

71
        Args:
72
            command_lst (list[str]): List of strings to start the client process.
73
        Raises:
74
            ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported.
75
        """
76
        if self._openmpi_oversubscribe:
1✔
77
            raise ValueError(
1✔
78
                "Oversubscribing is currently not supported for the Flux adapter."
79
            )
80
        if self._flux_executor is None:
1✔
UNCOV
81
            self._flux_executor = flux.job.FluxExecutor()
×
82
        if not self._flux_executor_nesting:
1✔
83
            jobspec = flux.job.JobspecV1.from_command(
1✔
84
                command=command_lst,
85
                num_tasks=self._cores,
86
                cores_per_task=self._threads_per_core,
87
                gpus_per_task=self._gpus_per_core,
88
                num_nodes=None,
89
                exclusive=False,
90
            )
91
        else:
NEW
92
            jobspec = flux.job.JobspecV1.from_nest_command(
×
93
                command=command_lst,
94
                num_slots=self._cores,
95
                cores_per_slot=self._threads_per_core,
96
                gpus_per_slot=self._gpus_per_core,
97
                num_nodes=None,
98
                exclusive=False,
99
            )
100
        jobspec.environment = dict(os.environ)
1✔
101
        if self._flux_executor_pmi_mode is not None:
1✔
102
            jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
1✔
103
        if self._cwd is not None:
1✔
104
            jobspec.cwd = self._cwd
1✔
105
        if self._flux_log_files and self._cwd is not None:
1✔
106
            jobspec.stderr = os.path.join(self._cwd, "flux.err")
1✔
107
            jobspec.stdout = os.path.join(self._cwd, "flux.out")
1✔
108
        elif self._flux_log_files:
1✔
109
            jobspec.stderr = os.path.abspath("flux.err")
1✔
110
            jobspec.stdout = os.path.abspath("flux.out")
1✔
111
        self._future = self._flux_executor.submit(jobspec)
1✔
112

113
    def shutdown(self, wait: bool = True):
1✔
114
        """
115
        Shutdown the FluxPythonInterface.
116

117
        Args:
118
            wait (bool, optional): Whether to wait for the execution to complete. Defaults to True.
119
        """
120
        if self.poll():
1✔
121
            self._future.cancel()
1✔
122
        # The flux future objects are not instantly updated,
123
        # still showing running after cancel was called,
124
        # so we wait until the execution is completed.
125
        self._future.result()
1✔
126

127
    def poll(self):
1✔
128
        """
129
        Check if the FluxPythonInterface is running.
130

131
        Returns:
132
            bool: True if the interface is running, False otherwise.
133
        """
134
        return self._future is not None and not self._future.done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc