• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 11541431333

27 Oct 2024 02:48PM UTC coverage: 94.012% (-0.6%) from 94.582%
11541431333

push

github

web-flow
Split shared cache in backend and frontend (#443)

* Split shared cache in backend and frontend

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

2 of 2 new or added lines in 1 file covered. (100.0%)

4 existing lines in 1 file now uncovered.

785 of 835 relevant lines covered (94.01%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.4
/executorlib/shared/cache.py
1
import importlib.util
1✔
2
import os
1✔
3
import queue
1✔
4
import subprocess
1✔
5
import sys
1✔
6
from concurrent.futures import Future
1✔
7
from typing import Tuple
1✔
8

9
from executorlib.shared.command import get_command_path
1✔
10
from executorlib.shared.hdf import dump, get_output
1✔
11
from executorlib.shared.serialize import serialize_funct_h5
1✔
12

13

14
class FutureItem:
1✔
15
    def __init__(self, file_name: str):
1✔
16
        """
17
        Initialize a FutureItem object.
18

19
        Args:
20
            file_name (str): The name of the file.
21

22
        """
23
        self._file_name = file_name
1✔
24

25
    def result(self) -> str:
1✔
26
        """
27
        Get the result of the future item.
28

29
        Returns:
30
            str: The result of the future item.
31

32
        """
UNCOV
33
        exec_flag, result = get_output(file_name=self._file_name)
×
UNCOV
34
        if exec_flag:
×
UNCOV
35
            return result
×
36
        else:
37
            return self.result()
×
38

39
    def done(self) -> bool:
1✔
40
        """
41
        Check if the future item is done.
42

43
        Returns:
44
            bool: True if the future item is done, False otherwise.
45

46
        """
UNCOV
47
        return get_output(file_name=self._file_name)[0]
×
48

49

50
def execute_in_subprocess(
1✔
51
    command: list, task_dependent_lst: list = []
52
) -> subprocess.Popen:
53
    """
54
    Execute a command in a subprocess.
55

56
    Args:
57
        command (list): The command to be executed.
58
        task_dependent_lst (list, optional): A list of subprocesses that the current subprocess depends on. Defaults to [].
59

60
    Returns:
61
        subprocess.Popen: The subprocess object.
62

63
    """
64
    while len(task_dependent_lst) > 0:
1✔
65
        task_dependent_lst = [
1✔
66
            task for task in task_dependent_lst if task.poll() is None
67
        ]
68
    return subprocess.Popen(command, universal_newlines=True)
1✔
69

70

71
def execute_tasks_h5(
1✔
72
    future_queue: queue.Queue,
73
    cache_directory: str,
74
    cores_per_worker: int,
75
    execute_function: callable,
76
) -> None:
77
    """
78
    Execute tasks stored in a queue using HDF5 files.
79

80
    Args:
81
        future_queue (queue.Queue): The queue containing the tasks.
82
        cache_directory (str): The directory to store the HDF5 files.
83
        cores_per_worker (int): The number of cores per worker.
84
        execute_function (callable): The function to execute the tasks.
85

86
    Returns:
87
        None
88

89
    """
90
    memory_dict, process_dict, file_name_dict = {}, {}, {}
1✔
91
    while True:
1✔
92
        task_dict = None
1✔
93
        try:
1✔
94
            task_dict = future_queue.get_nowait()
1✔
95
        except queue.Empty:
1✔
96
            pass
1✔
97
        if (
1✔
98
            task_dict is not None
99
            and "shutdown" in task_dict.keys()
100
            and task_dict["shutdown"]
101
        ):
102
            future_queue.task_done()
1✔
103
            future_queue.join()
1✔
104
            break
1✔
105
        elif task_dict is not None:
1✔
106
            task_args, task_kwargs, future_wait_key_lst = _convert_args_and_kwargs(
1✔
107
                task_dict=task_dict,
108
                memory_dict=memory_dict,
109
                file_name_dict=file_name_dict,
110
            )
111
            task_key, data_dict = serialize_funct_h5(
1✔
112
                task_dict["fn"], *task_args, **task_kwargs
113
            )
114
            if task_key not in memory_dict.keys():
1✔
115
                if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
116
                    file_name = os.path.join(cache_directory, task_key + ".h5in")
1✔
117
                    dump(file_name=file_name, data_dict=data_dict)
1✔
118
                    process_dict[task_key] = execute_function(
1✔
119
                        command=_get_execute_command(
120
                            file_name=file_name,
121
                            cores=cores_per_worker,
122
                        ),
123
                        task_dependent_lst=[
124
                            process_dict[k] for k in future_wait_key_lst
125
                        ],
126
                    )
127
                file_name_dict[task_key] = os.path.join(
1✔
128
                    cache_directory, task_key + ".h5out"
129
                )
130
                memory_dict[task_key] = task_dict["future"]
1✔
131
            future_queue.task_done()
1✔
132
        else:
133
            memory_dict = {
1✔
134
                key: _check_task_output(
135
                    task_key=key, future_obj=value, cache_directory=cache_directory
136
                )
137
                for key, value in memory_dict.items()
138
                if not value.done()
139
            }
140

141

142
def _get_execute_command(file_name: str, cores: int = 1) -> list:
1✔
143
    """
144
    Get command to call backend as a list of two strings
145

146
    Args:
147
        file_name (str): The name of the file.
148
        cores (int, optional): Number of cores used to execute the task. Defaults to 1.
149

150
    Returns:
151
        list[str]: List of strings containing the python executable path and the backend script to execute
152
    """
153
    command_lst = [sys.executable]
1✔
154
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
155
        command_lst = (
1✔
156
            ["mpiexec", "-n", str(cores)]
157
            + command_lst
158
            + [get_command_path(executable="cache_parallel.py"), file_name]
159
        )
160
    elif cores > 1:
1✔
161
        raise ImportError(
×
162
            "mpi4py is required for parallel calculations. Please install mpi4py."
163
        )
164
    else:
165
        command_lst += [get_command_path(executable="cache_serial.py"), file_name]
1✔
166
    return command_lst
1✔
167

168

169
def _check_task_output(
1✔
170
    task_key: str, future_obj: Future, cache_directory: str
171
) -> Future:
172
    """
173
    Check the output of a task and set the result of the future object if available.
174

175
    Args:
176
        task_key (str): The key of the task.
177
        future_obj (Future): The future object associated with the task.
178
        cache_directory (str): The directory where the HDF5 files are stored.
179

180
    Returns:
181
        Future: The updated future object.
182

183
    """
184
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
185
    if not os.path.exists(file_name):
1✔
186
        return future_obj
1✔
187
    exec_flag, result = get_output(file_name=file_name)
1✔
188
    if exec_flag:
1✔
189
        future_obj.set_result(result)
1✔
190
    return future_obj
1✔
191

192

193
def _convert_args_and_kwargs(
1✔
194
    task_dict: dict, memory_dict: dict, file_name_dict: dict
195
) -> Tuple[list, dict, list]:
196
    """
197
    Convert the arguments and keyword arguments in a task dictionary to the appropriate types.
198

199
    Args:
200
        task_dict (dict): The task dictionary containing the arguments and keyword arguments.
201
        memory_dict (dict): The dictionary mapping future objects to their associated task keys.
202
        file_name_dict (dict): The dictionary mapping task keys to their corresponding file names.
203

204
    Returns:
205
        Tuple[list, dict, list]: A tuple containing the converted arguments, converted keyword arguments, and a list of future wait keys.
206

207
    """
208
    task_args = []
1✔
209
    task_kwargs = {}
1✔
210
    future_wait_key_lst = []
1✔
211
    for arg in task_dict["args"]:
1✔
212
        if isinstance(arg, Future):
1✔
213
            match_found = False
1✔
214
            for k, v in memory_dict.items():
1✔
215
                if arg == v:
1✔
216
                    task_args.append(FutureItem(file_name=file_name_dict[k]))
1✔
217
                    future_wait_key_lst.append(k)
1✔
218
                    match_found = True
1✔
219
                    break
1✔
220
            if not match_found:
1✔
221
                task_args.append(arg.result())
×
222
        else:
223
            task_args.append(arg)
1✔
224
    for key, arg in task_dict["kwargs"].items():
1✔
225
        if isinstance(arg, Future):
1✔
226
            match_found = False
1✔
227
            for k, v in memory_dict.items():
1✔
228
                if arg == v:
1✔
229
                    task_kwargs[key] = FutureItem(file_name=file_name_dict[k])
1✔
230
                    future_wait_key_lst.append(k)
1✔
231
                    match_found = True
1✔
232
                    break
1✔
233
            if not match_found:
1✔
234
                task_kwargs[key] = arg.result()
×
235
        else:
236
            task_kwargs[key] = arg
1✔
237
    return task_args, task_kwargs, future_wait_key_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc