• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11907763573

19 Nov 2024 06:52AM CUT coverage: 95.648%. Remained the same
11907763573

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#502)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.7.3 → v0.7.4](https://github.com/astral-sh/ruff-pre-commit/compare/v0.7.3...v0.7.4)

* Update environment-openmpi.yml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jan Janssen <jan-janssen@users.noreply.github.com>

945 of 988 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/executorlib/cache/queue_spawner.py
1
import os
1✔
2
import subprocess
1✔
3
from typing import List, Optional, Tuple, Union
1✔
4

5
from pysqa import QueueAdapter
1✔
6

7
from executorlib.standalone.hdf import dump, get_queue_id
1✔
8
from executorlib.standalone.inputcheck import check_file_exists
1✔
9

10

11
def execute_with_pysqa(
1✔
12
    command: list,
13
    task_dependent_lst: list[int] = [],
14
    file_name: Optional[str] = None,
15
    resource_dict: Optional[dict] = None,
16
    config_directory: Optional[str] = None,
17
    backend: Optional[str] = None,
18
    cache_directory: Optional[str] = None,
19
) -> Tuple[int, int]:
20
    """
21
    Execute a command by submitting it to the queuing system
22

23
    Args:
24
        command (list): The command to be executed.
25
        task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
26
        file_name (str): Name of the HDF5 file which contains the Python function
27
        resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
28
                              Example resource dictionary: {
29
                                  cwd: None,
30
                              }
31
        config_directory (str, optional): path to the config directory.
32
        backend (str, optional): name of the backend used to spawn tasks.
33
        cache_directory (str): The directory to store the HDF5 files.
34

35
    Returns:
36
        int: queuing system ID
37
    """
38
    check_file_exists(file_name=file_name)
1✔
39
    queue_id = get_queue_id(file_name=file_name)
1✔
40
    qa = QueueAdapter(
1✔
41
        directory=config_directory,
42
        queue_type=backend,
43
        execute_command=_pysqa_execute_command,
44
    )
45
    if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
1✔
46
        if resource_dict is None:
1✔
47
            resource_dict = {}
×
48
        if "cwd" in resource_dict and resource_dict["cwd"] is not None:
1✔
49
            cwd = resource_dict["cwd"]
1✔
50
        else:
51
            cwd = cache_directory
×
52
        submit_kwargs = {
1✔
53
            "command": " ".join(command),
54
            "dependency_list": [str(qid) for qid in task_dependent_lst],
55
            "working_directory": os.path.abspath(cwd),
56
        }
57
        if "cwd" in resource_dict:
1✔
58
            del resource_dict["cwd"]
1✔
59
        unsupported_keys = [
1✔
60
            "threads_per_core",
61
            "gpus_per_core",
62
            "openmpi_oversubscribe",
63
            "slurm_cmd_args",
64
        ]
65
        for k in unsupported_keys:
1✔
66
            if k in resource_dict:
1✔
67
                del resource_dict[k]
1✔
68
        if "job_name" not in resource_dict:
1✔
69
            resource_dict["job_name"] = "pysqa"
1✔
70
        submit_kwargs.update(resource_dict)
1✔
71
        queue_id = qa.submit_job(**submit_kwargs)
1✔
72
        dump(file_name=file_name, data_dict={"queue_id": queue_id})
1✔
73
    return queue_id
1✔
74

75

76
def _pysqa_execute_command(
1✔
77
    commands: str,
78
    working_directory: Optional[str] = None,
79
    split_output: bool = True,
80
    shell: bool = False,
81
    error_filename: str = "pysqa.err",
82
) -> Union[str, List[str]]:
83
    """
84
    A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
85
    fails to submit the job to the queue.
86

87
    Args:
88
        commands (str): The command(s) to be executed on the command line
89
        working_directory (str, optional): The directory where the command is executed. Defaults to None.
90
        split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True.
91
        shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False.
92
        error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err".
93

94
    Returns:
95
        Union[str, List[str]]: Output of the shell command either as a string or as a list of strings
96
    """
97
    if shell and isinstance(commands, list):
1✔
98
        commands = " ".join(commands)
1✔
99
    out = subprocess.check_output(
1✔
100
        commands,
101
        cwd=working_directory,
102
        stderr=subprocess.STDOUT,
103
        universal_newlines=True,
104
        shell=not isinstance(commands, list),
105
    )
106
    if out is not None and split_output:
1✔
107
        return out.split("\n")
1✔
108
    else:
109
        return out
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc