• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12365729826

17 Dec 2024 03:23AM UTC coverage: 95.409% (-0.3%) from 95.661%
12365729826

Pull #519

github

web-flow
Merge 03659b665 into c3a0ae727
Pull Request #519: Add option to write flux log files

8 of 11 new or added lines in 4 files covered. (72.73%)

3 existing lines in 2 files now uncovered.

956 of 1002 relevant lines covered (95.41%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/executorlib/interactive/flux.py
1
import os
1✔
2
from typing import Optional
1✔
3

4
import flux.job
1✔
5

6
from executorlib.standalone.interactive.spawner import BaseSpawner
1✔
7

8

9
class FluxPythonSpawner(BaseSpawner):
1✔
10
    """
11
    A class representing the FluxPythonInterface.
12

13
    Args:
14
        cwd (str, optional): The current working directory. Defaults to None.
15
        cores (int, optional): The number of cores. Defaults to 1.
16
        threads_per_core (int, optional): The number of threads per base. Defaults to 1.
17
        gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
18
        openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
19
        flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
20
        flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
21
        flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
22
        flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
23
    """
24

25
    def __init__(
1✔
26
        self,
27
        cwd: Optional[str] = None,
28
        cores: int = 1,
29
        threads_per_core: int = 1,
30
        gpus_per_core: int = 0,
31
        openmpi_oversubscribe: bool = False,
32
        flux_executor: Optional[flux.job.FluxExecutor] = None,
33
        flux_executor_pmi_mode: Optional[str] = None,
34
        flux_executor_nesting: bool = False,
35
        flux_log_files: bool = False,
36
    ):
37
        super().__init__(
1✔
38
            cwd=cwd,
39
            cores=cores,
40
            openmpi_oversubscribe=openmpi_oversubscribe,
41
        )
42
        self._threads_per_core = threads_per_core
1✔
43
        self._gpus_per_core = gpus_per_core
1✔
44
        self._flux_executor = flux_executor
1✔
45
        self._flux_executor_pmi_mode = flux_executor_pmi_mode
1✔
46
        self._flux_executor_nesting = flux_executor_nesting
1✔
47
        self._flux_log_files = flux_log_files
1✔
48
        self._future = None
1✔
49

50
    def bootup(
1✔
51
        self,
52
        command_lst: list[str],
53
    ):
54
        """
55
        Boot up the client process to connect to the SocketInterface.
56

57
        Args:
58
            command_lst (list[str]): List of strings to start the client process.
59
        Raises:
60
            ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported.
61
        """
62
        if self._openmpi_oversubscribe:
1✔
63
            raise ValueError(
1✔
64
                "Oversubscribing is currently not supported for the Flux adapter."
65
            )
66
        if self._flux_executor is None:
1✔
67
            self._flux_executor = flux.job.FluxExecutor()
×
68
        if not self._flux_executor_nesting:
1✔
69
            jobspec = flux.job.JobspecV1.from_command(
1✔
70
                command=command_lst,
71
                num_tasks=self._cores,
72
                cores_per_task=self._threads_per_core,
73
                gpus_per_task=self._gpus_per_core,
74
                num_nodes=None,
75
                exclusive=False,
76
            )
77
        else:
78
            jobspec = flux.job.JobspecV1.from_nest_command(
×
79
                command=command_lst,
80
                num_slots=self._cores,
81
                cores_per_slot=self._threads_per_core,
82
                gpus_per_slot=self._gpus_per_core,
83
                num_nodes=None,
84
                exclusive=False,
85
            )
86
        jobspec.environment = dict(os.environ)
1✔
87
        if self._flux_executor_pmi_mode is not None:
1✔
88
            jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
1✔
89
        if self._cwd is not None:
1✔
90
            jobspec.cwd = self._cwd
×
91
        if self._flux_log_files and self._cwd is not None:
1✔
NEW
UNCOV
92
            jobspec.stderr = os.path.join(self._cwd, "flux.err")
×
NEW
UNCOV
93
            jobspec.stdout = os.path.join(self._cwd, "flux.out")
×
94
        self._future = self._flux_executor.submit(jobspec)
1✔
95

96
    def shutdown(self, wait: bool = True):
1✔
97
        """
98
        Shutdown the FluxPythonInterface.
99

100
        Args:
101
            wait (bool, optional): Whether to wait for the execution to complete. Defaults to True.
102
        """
103
        if self.poll():
1✔
104
            self._future.cancel()
1✔
105
        # The flux future objects are not instantly updated,
106
        # still showing running after cancel was called,
107
        # so we wait until the execution is completed.
108
        self._future.result()
1✔
109

110
    def poll(self):
1✔
111
        """
112
        Check if the FluxPythonInterface is running.
113

114
        Returns:
115
            bool: True if the interface is running, False otherwise.
116
        """
117
        return self._future is not None and not self._future.done()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc