• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 13116578816

03 Feb 2025 03:11PM UTC coverage: 96.536%. Remained the same
13116578816

Pull #555

github

web-flow
Merge 70bacefb9 into 5ec2a2015
Pull Request #555: Add linting

14 of 15 new or added lines in 7 files covered. (93.33%)

17 existing lines in 7 files now uncovered.

1059 of 1097 relevant lines covered (96.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/executorlib/cache/shared.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import sys
1✔
5
from concurrent.futures import Future
1✔
6
from typing import Any, Callable, Optional, Tuple
1✔
7

8
from executorlib.standalone.command import get_command_path
1✔
9
from executorlib.standalone.hdf import dump, get_output
1✔
10
from executorlib.standalone.serialize import serialize_funct_h5
1✔
11

12

13
class FutureItem:
1✔
14
    def __init__(self, file_name: str):
1✔
15
        """
16
        Initialize a FutureItem object.
17

18
        Args:
19
            file_name (str): The name of the file.
20

21
        """
22
        self._file_name = file_name
1✔
23

24
    def result(self) -> Any:
1✔
25
        """
26
        Get the result of the future item.
27

28
        Returns:
29
            str: The result of the future item.
30

31
        """
32
        exec_flag, result = get_output(file_name=self._file_name)
1✔
33
        if exec_flag:
1✔
34
            return result
1✔
35
        else:
UNCOV
36
            return self.result()
×
37

38
    def done(self) -> bool:
1✔
39
        """
40
        Check if the future item is done.
41

42
        Returns:
43
            bool: True if the future item is done, False otherwise.
44

45
        """
46
        return get_output(file_name=self._file_name)[0]
1✔
47

48

49
def execute_tasks_h5(
1✔
50
    future_queue: queue.Queue,
51
    cache_directory: str,
52
    execute_function: Callable,
53
    resource_dict: dict,
54
    terminate_function: Optional[Callable] = None,
55
    pysqa_config_directory: Optional[str] = None,
56
    backend: Optional[str] = None,
57
    disable_dependencies: bool = False,
58
) -> None:
59
    """
60
    Execute tasks stored in a queue using HDF5 files.
61

62
    Args:
63
        future_queue (queue.Queue): The queue containing the tasks.
64
        cache_directory (str): The directory to store the HDF5 files.
65
        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
66
                              - cores (int): number of MPI cores to be used for each function call
67
                              - cwd (str/None): current working directory where the parallel python task is executed
68
        execute_function (Callable): The function to execute the tasks.
69
        terminate_function (Callable): The function to terminate the tasks.
70
        pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
71
        backend (str, optional): name of the backend used to spawn tasks.
72

73
    Returns:
74
        None
75

76
    """
77
    memory_dict: dict = {}
1✔
78
    process_dict: dict = {}
1✔
79
    file_name_dict: dict = {}
1✔
80
    while True:
1✔
81
        task_dict = None
1✔
82
        try:
1✔
83
            task_dict = future_queue.get_nowait()
1✔
84
        except queue.Empty:
1✔
85
            pass
1✔
86
        if (
1✔
87
            task_dict is not None
88
            and "shutdown" in task_dict.keys()
89
            and task_dict["shutdown"]
90
        ):
91
            if terminate_function is not None:
1✔
92
                for task in process_dict.values():
1✔
93
                    terminate_function(task=task)
1✔
94
            future_queue.task_done()
1✔
95
            future_queue.join()
1✔
96
            break
1✔
97
        elif task_dict is not None:
1✔
98
            task_args, task_kwargs, future_wait_key_lst = _convert_args_and_kwargs(
1✔
99
                task_dict=task_dict,
100
                memory_dict=memory_dict,
101
                file_name_dict=file_name_dict,
102
            )
103
            task_resource_dict = task_dict["resource_dict"].copy()
1✔
104
            task_resource_dict.update(
1✔
105
                {k: v for k, v in resource_dict.items() if k not in task_resource_dict}
106
            )
107
            task_key, data_dict = serialize_funct_h5(
1✔
108
                fn=task_dict["fn"],
109
                fn_args=task_args,
110
                fn_kwargs=task_kwargs,
111
                resource_dict=task_resource_dict,
112
            )
113
            if task_key not in memory_dict:
1✔
114
                if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
115
                    file_name = os.path.join(cache_directory, task_key + ".h5in")
1✔
116
                    dump(file_name=file_name, data_dict=data_dict)
1✔
117
                    if not disable_dependencies:
1✔
118
                        task_dependent_lst = [
1✔
119
                            process_dict[k] for k in future_wait_key_lst
120
                        ]
121
                    else:
122
                        if len(future_wait_key_lst) > 0:
1✔
123
                            raise ValueError(
1✔
124
                                "Future objects are not supported as input if disable_dependencies=True."
125
                            )
126
                        task_dependent_lst = []
1✔
127
                    process_dict[task_key] = execute_function(
1✔
128
                        command=_get_execute_command(
129
                            file_name=file_name,
130
                            cores=task_resource_dict["cores"],
131
                        ),
132
                        file_name=file_name,
133
                        task_dependent_lst=task_dependent_lst,
134
                        resource_dict=task_resource_dict,
135
                        config_directory=pysqa_config_directory,
136
                        backend=backend,
137
                        cache_directory=cache_directory,
138
                    )
139
                file_name_dict[task_key] = os.path.join(
1✔
140
                    cache_directory, task_key + ".h5out"
141
                )
142
                memory_dict[task_key] = task_dict["future"]
1✔
143
            future_queue.task_done()
1✔
144
        else:
145
            memory_dict = {
1✔
146
                key: _check_task_output(
147
                    task_key=key, future_obj=value, cache_directory=cache_directory
148
                )
149
                for key, value in memory_dict.items()
150
                if not value.done()
151
            }
152

153

154
def _get_execute_command(file_name: str, cores: int = 1) -> list:
1✔
155
    """
156
    Get command to call backend as a list of two strings
157

158
    Args:
159
        file_name (str): The name of the file.
160
        cores (int, optional): Number of cores used to execute the task. Defaults to 1.
161

162
    Returns:
163
        list[str]: List of strings containing the python executable path and the backend script to execute
164
    """
165
    command_lst = [sys.executable]
1✔
166
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
167
        command_lst = (
1✔
168
            ["mpiexec", "-n", str(cores)]
169
            + command_lst
170
            + [get_command_path(executable="cache_parallel.py"), file_name]
171
        )
172
    elif cores > 1:
1✔
UNCOV
173
        raise ImportError(
×
174
            "mpi4py is required for parallel calculations. Please install mpi4py."
175
        )
176
    else:
177
        command_lst += [get_command_path(executable="cache_serial.py"), file_name]
1✔
178
    return command_lst
1✔
179

180

181
def _check_task_output(
1✔
182
    task_key: str, future_obj: Future, cache_directory: str
183
) -> Future:
184
    """
185
    Check the output of a task and set the result of the future object if available.
186

187
    Args:
188
        task_key (str): The key of the task.
189
        future_obj (Future): The future object associated with the task.
190
        cache_directory (str): The directory where the HDF5 files are stored.
191

192
    Returns:
193
        Future: The updated future object.
194

195
    """
196
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
197
    if not os.path.exists(file_name):
1✔
198
        return future_obj
1✔
199
    exec_flag, result = get_output(file_name=file_name)
1✔
200
    if exec_flag:
1✔
201
        future_obj.set_result(result)
1✔
202
    return future_obj
1✔
203

204

205
def _convert_args_and_kwargs(
1✔
206
    task_dict: dict, memory_dict: dict, file_name_dict: dict
207
) -> Tuple[list, dict, list]:
208
    """
209
    Convert the arguments and keyword arguments in a task dictionary to the appropriate types.
210

211
    Args:
212
        task_dict (dict): The task dictionary containing the arguments and keyword arguments.
213
        memory_dict (dict): The dictionary mapping future objects to their associated task keys.
214
        file_name_dict (dict): The dictionary mapping task keys to their corresponding file names.
215

216
    Returns:
217
        Tuple[list, dict, list]: A tuple containing the converted arguments, converted keyword arguments, and a list of future wait keys.
218

219
    """
220
    task_args = []
1✔
221
    task_kwargs = {}
1✔
222
    future_wait_key_lst = []
1✔
223
    for arg in task_dict["args"]:
1✔
224
        if isinstance(arg, Future):
1✔
225
            match_found = False
1✔
226
            for k, v in memory_dict.items():
1✔
227
                if arg == v:
1✔
228
                    task_args.append(FutureItem(file_name=file_name_dict[k]))
1✔
229
                    future_wait_key_lst.append(k)
1✔
230
                    match_found = True
1✔
231
                    break
1✔
232
            if not match_found:
1✔
UNCOV
233
                task_args.append(arg.result())
×
234
        else:
235
            task_args.append(arg)
1✔
236
    for key, arg in task_dict["kwargs"].items():
1✔
237
        if isinstance(arg, Future):
1✔
238
            match_found = False
1✔
239
            for k, v in memory_dict.items():
1✔
240
                if arg == v:
1✔
241
                    task_kwargs[key] = FutureItem(file_name=file_name_dict[k])
1✔
242
                    future_wait_key_lst.append(k)
1✔
243
                    match_found = True
1✔
244
                    break
1✔
245
            if not match_found:
1✔
UNCOV
246
                task_kwargs[key] = arg.result()
×
247
        else:
248
            task_kwargs[key] = arg
1✔
249
    return task_args, task_kwargs, future_wait_key_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc