• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13119627889

03 Feb 2025 05:51PM UTC coverage: 95.996% (-0.5%) from 96.536%
13119627889

Pull #555

github

web-flow
Merge 05f160496 into 5ec2a2015
Pull Request #555: Add linting

75 of 85 new or added lines in 16 files covered. (88.24%)

1 existing line in 1 file now uncovered.

1079 of 1124 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.11
/executorlib/cache/queue_spawner.py
1
import os
1✔
2
import subprocess
1✔
3
from typing import Optional, Union
1✔
4

5
from pysqa import QueueAdapter
1✔
6

7
from executorlib.standalone.hdf import dump, get_queue_id
1✔
8
from executorlib.standalone.inputcheck import check_file_exists
1✔
9

10

11
def execute_with_pysqa(
1✔
12
    command: list,
13
    task_dependent_lst: Optional[list[int]] = None,
14
    file_name: Optional[str] = None,
15
    resource_dict: Optional[dict] = None,
16
    config_directory: Optional[str] = None,
17
    backend: Optional[str] = None,
18
    cache_directory: Optional[str] = None,
19
) -> Optional[int]:
20
    """
21
    Execute a command by submitting it to the queuing system
22

23
    Args:
24
        command (list): The command to be executed.
25
        task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
26
        file_name (str): Name of the HDF5 file which contains the Python function
27
        resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
28
                              Example resource dictionary: {
29
                                  cwd: None,
30
                              }
31
        config_directory (str, optional): path to the config directory.
32
        backend (str, optional): name of the backend used to spawn tasks.
33
        cache_directory (str): The directory to store the HDF5 files.
34

35
    Returns:
36
        int: queuing system ID
37
    """
38
    if task_dependent_lst is None:
1✔
NEW
39
        task_dependent_lst = []
×
40
    check_file_exists(file_name=file_name)
1✔
41
    queue_id = get_queue_id(file_name=file_name)
1✔
42
    qa = QueueAdapter(
1✔
43
        directory=config_directory,
44
        queue_type=backend,
45
        execute_command=_pysqa_execute_command,
46
    )
47
    if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
1✔
48
        if resource_dict is None:
1✔
49
            resource_dict = {}
×
50
        if "cwd" in resource_dict and resource_dict["cwd"] is not None:
1✔
51
            cwd = resource_dict["cwd"]
1✔
52
        else:
53
            cwd = cache_directory
×
54
        submit_kwargs = {
1✔
55
            "command": " ".join(command),
56
            "dependency_list": [str(qid) for qid in task_dependent_lst],
57
            "working_directory": os.path.abspath(cwd),
58
        }
59
        if "cwd" in resource_dict:
1✔
60
            del resource_dict["cwd"]
1✔
61
        unsupported_keys = [
1✔
62
            "threads_per_core",
63
            "gpus_per_core",
64
            "openmpi_oversubscribe",
65
            "slurm_cmd_args",
66
        ]
67
        for k in unsupported_keys:
1✔
68
            if k in resource_dict:
1✔
69
                del resource_dict[k]
1✔
70
        if "job_name" not in resource_dict:
1✔
71
            resource_dict["job_name"] = "pysqa"
1✔
72
        submit_kwargs.update(resource_dict)
1✔
73
        queue_id = qa.submit_job(**submit_kwargs)
1✔
74
        dump(file_name=file_name, data_dict={"queue_id": queue_id})
1✔
75
    return queue_id
1✔
76

77

78
def _pysqa_execute_command(
1✔
79
    commands: str,
80
    working_directory: Optional[str] = None,
81
    split_output: bool = True,
82
    shell: bool = False,
83
    error_filename: str = "pysqa.err",
84
) -> Union[str, list[str]]:
85
    """
86
    A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
87
    fails to submit the job to the queue.
88

89
    Args:
90
        commands (str): The command(s) to be executed on the command line
91
        working_directory (str, optional): The directory where the command is executed. Defaults to None.
92
        split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True.
93
        shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False.
94
        error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err".
95

96
    Returns:
97
        Union[str, List[str]]: Output of the shell command either as a string or as a list of strings
98
    """
99
    if shell and isinstance(commands, list):
1✔
100
        commands = " ".join(commands)
1✔
101
    out = subprocess.check_output(
1✔
102
        commands,
103
        cwd=working_directory,
104
        stderr=subprocess.STDOUT,
105
        universal_newlines=True,
106
        shell=not isinstance(commands, list),
107
    )
108
    if out is not None and split_output:
1✔
109
        return out.split("\n")
1✔
110
    else:
111
        return out
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc