• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pympipool / 9890133465

11 Jul 2024 10:52AM UTC coverage: 93.776%. Remained the same
9890133465

push

github

web-flow
Merge pull request #370 from pyiron/dependabot/pip/matplotlib-3.9.1

Bump matplotlib from 3.9.0 to 3.9.1

889 of 948 relevant lines covered (93.78%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.58
/pympipool/cache/shared.py
1
import hashlib
1✔
2
import importlib.util
1✔
3
import os
1✔
4
import queue
1✔
5
import re
1✔
6
import subprocess
1✔
7
import sys
1✔
8
from concurrent.futures import Future
1✔
9
from typing import Tuple
1✔
10

11
import cloudpickle
1✔
12

13
from pympipool.cache.hdf import dump, get_output, load
1✔
14
from pympipool.shared.executor import get_command_path
1✔
15

16

17
class FutureItem:
1✔
18
    def __init__(self, file_name: str):
1✔
19
        self._file_name = file_name
1✔
20

21
    def result(self):
1✔
22
        exec_flag, result = get_output(file_name=self._file_name)
1✔
23
        if exec_flag:
1✔
24
            return result
1✔
25
        else:
26
            return self.result()
×
27

28
    def done(self):
1✔
29
        return get_output(file_name=self._file_name)[0]
1✔
30

31

32
def backend_load_file(file_name: str) -> dict:
1✔
33
    apply_dict = load(file_name=file_name)
1✔
34
    apply_dict["args"] = [
1✔
35
        arg if not isinstance(arg, FutureItem) else arg.result()
36
        for arg in apply_dict["args"]
37
    ]
38
    apply_dict["kwargs"] = {
1✔
39
        key: arg if not isinstance(arg, FutureItem) else arg.result()
40
        for key, arg in apply_dict["kwargs"].items()
41
    }
42
    return apply_dict
1✔
43

44

45
def backend_write_file(file_name: str, output):
1✔
46
    file_name_out = os.path.splitext(file_name)[0]
1✔
47
    os.rename(file_name, file_name_out + ".h5ready")
1✔
48
    dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
1✔
49
    os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
1✔
50

51

52
def execute_in_subprocess(
1✔
53
    command: list, task_dependent_lst: list = []
54
) -> subprocess.Popen:
55
    while len(task_dependent_lst) > 0:
1✔
56
        task_dependent_lst = [
1✔
57
            task for task in task_dependent_lst if task.poll() is None
58
        ]
59
    return subprocess.Popen(command, universal_newlines=True)
1✔
60

61

62
def execute_tasks_h5(
1✔
63
    future_queue: queue.Queue,
64
    cache_directory: str,
65
    cores_per_worker: int,
66
    execute_function: callable,
67
):
68
    memory_dict, process_dict, file_name_dict = {}, {}, {}
1✔
69
    while True:
1✔
70
        task_dict = None
1✔
71
        try:
1✔
72
            task_dict = future_queue.get_nowait()
1✔
73
        except queue.Empty:
1✔
74
            pass
1✔
75
        if (
1✔
76
            task_dict is not None
77
            and "shutdown" in task_dict.keys()
78
            and task_dict["shutdown"]
79
        ):
80
            future_queue.task_done()
1✔
81
            future_queue.join()
1✔
82
            break
1✔
83
        elif task_dict is not None:
1✔
84
            task_args, task_kwargs, future_wait_key_lst = _convert_args_and_kwargs(
1✔
85
                task_dict=task_dict,
86
                memory_dict=memory_dict,
87
                file_name_dict=file_name_dict,
88
            )
89
            task_key, data_dict = _serialize_funct_h5(
1✔
90
                task_dict["fn"], *task_args, **task_kwargs
91
            )
92
            if task_key not in memory_dict.keys():
1✔
93
                if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
94
                    file_name = os.path.join(cache_directory, task_key + ".h5in")
1✔
95
                    dump(file_name=file_name, data_dict=data_dict)
1✔
96
                    process_dict[task_key] = execute_function(
1✔
97
                        command=_get_execute_command(
98
                            file_name=file_name,
99
                            cores=cores_per_worker,
100
                        ),
101
                        task_dependent_lst=[
102
                            process_dict[k] for k in future_wait_key_lst
103
                        ],
104
                    )
105
                file_name_dict[task_key] = os.path.join(
1✔
106
                    cache_directory, task_key + ".h5out"
107
                )
108
                memory_dict[task_key] = task_dict["future"]
1✔
109
            future_queue.task_done()
1✔
110
        else:
111
            memory_dict = {
1✔
112
                key: _check_task_output(
113
                    task_key=key, future_obj=value, cache_directory=cache_directory
114
                )
115
                for key, value in memory_dict.items()
116
                if not value.done()
117
            }
118

119

120
def execute_task_in_file(file_name: str):
1✔
121
    """
122
    Execute the task stored in a given HDF5 file
123

124
    Args:
125
        file_name (str): file name of the HDF5 file as absolute path
126
    """
127
    apply_dict = backend_load_file(file_name=file_name)
1✔
128
    result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
1✔
129
    backend_write_file(
1✔
130
        file_name=file_name,
131
        output=result,
132
    )
133

134

135
def _get_execute_command(file_name: str, cores: int = 1) -> list:
1✔
136
    """
137
    Get command to call backend as a list of two strings
138
    Args:
139
        file_name (str):
140
        cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
141
    Returns:
142
        list[str]: List of strings containing the python executable path and the backend script to execute
143
    """
144
    command_lst = [sys.executable]
1✔
145
    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
1✔
146
        command_lst = (
1✔
147
            ["mpiexec", "-n", str(cores)]
148
            + command_lst
149
            + [get_command_path(executable="cache_parallel.py"), file_name]
150
        )
151
    elif cores > 1:
1✔
152
        raise ImportError(
×
153
            "mpi4py is required for parallel calculations. Please install mpi4py."
154
        )
155
    else:
156
        command_lst += [get_command_path(executable="cache_serial.py"), file_name]
1✔
157
    return command_lst
1✔
158

159

160
def _get_hash(binary: bytes):
1✔
161
    # Remove specification of jupyter kernel from hash to be deterministic
162
    binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
1✔
163
    return str(hashlib.md5(binary_no_ipykernel).hexdigest())
1✔
164

165

166
def _serialize_funct_h5(fn: callable, *args, **kwargs):
1✔
167
    binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
1✔
168
    task_key = fn.__name__ + _get_hash(binary=binary_all)
1✔
169
    data = {"fn": fn, "args": args, "kwargs": kwargs}
1✔
170
    return task_key, data
1✔
171

172

173
def _check_task_output(
1✔
174
    task_key: str, future_obj: Future, cache_directory: str
175
) -> Future:
176
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
177
    if not os.path.exists(file_name):
1✔
178
        return future_obj
1✔
179
    exec_flag, result = get_output(file_name=file_name)
1✔
180
    if exec_flag:
1✔
181
        future_obj.set_result(result)
1✔
182
    return future_obj
1✔
183

184

185
def _convert_args_and_kwargs(
1✔
186
    task_dict: dict, memory_dict: dict, file_name_dict: dict
187
) -> Tuple:
188
    task_args = []
1✔
189
    task_kwargs = {}
1✔
190
    future_wait_key_lst = []
1✔
191
    for arg in task_dict["args"]:
1✔
192
        if isinstance(arg, Future):
1✔
193
            match_found = False
1✔
194
            for k, v in memory_dict.items():
1✔
195
                if arg == v:
1✔
196
                    task_args.append(FutureItem(file_name=file_name_dict[k]))
1✔
197
                    future_wait_key_lst.append(k)
1✔
198
                    match_found = True
1✔
199
                    break
1✔
200
            if not match_found:
1✔
201
                task_args.append(arg.result())
×
202
        else:
203
            task_args.append(arg)
1✔
204
    for key, arg in task_dict["kwargs"].items():
1✔
205
        if isinstance(arg, Future):
1✔
206
            match_found = False
1✔
207
            for k, v in memory_dict.items():
1✔
208
                if arg == v:
1✔
209
                    task_kwargs[key] = FutureItem(file_name=file_name_dict[k])
1✔
210
                    future_wait_key_lst.append(k)
1✔
211
                    match_found = True
1✔
212
                    break
1✔
213
            if not match_found:
1✔
214
                task_kwargs[key] = arg.result()
×
215
        else:
216
            task_kwargs[key] = arg
1✔
217
    return task_args, task_kwargs, future_wait_key_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc