• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pympipool / 9299868304

30 May 2024 09:00AM UTC coverage: 92.014% (+0.9%) from 91.159%
9299868304

push

github

web-flow
Merge pull request #344 from pyiron/cache

Add FileExecutor

141 of 149 new or added lines in 5 files covered. (94.63%)

772 of 839 relevant lines covered (92.01%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/pympipool/cache/shared.py
1
from concurrent.futures import Future
1✔
2
import hashlib
1✔
3
import os
1✔
4
import queue
1✔
5
import re
1✔
6
import subprocess
1✔
7

8
import cloudpickle
1✔
9

10
from pympipool.cache.hdf import dump, get_output
1✔
11

12

13
class FutureItem:
1✔
14
    def __init__(self, file_name: str):
1✔
15
        self._file_name = file_name
1✔
16

17
    def result(self):
1✔
18
        exec_flag, result = get_output(file_name=self._file_name)
1✔
19
        if exec_flag:
1✔
20
            return result
1✔
21
        else:
NEW
22
            return self.result()
×
23

24
    def done(self):
1✔
25
        return get_output(file_name=self._file_name)[0]
1✔
26

27

28
def execute_in_subprocess(command, task_dependent_lst=[]):
1✔
29
    while len(task_dependent_lst) > 0:
1✔
30
        task_dependent_lst = [
1✔
31
            task for task in task_dependent_lst if task.poll() is None
32
        ]
33
    return subprocess.Popen(command, universal_newlines=True)
1✔
34

35

36
def execute_tasks_h5(future_queue, cache_directory, execute_function):
1✔
37
    memory_dict, process_dict, file_name_dict = {}, {}, {}
1✔
38
    while True:
1✔
39
        task_dict = None
1✔
40
        try:
1✔
41
            task_dict = future_queue.get_nowait()
1✔
42
        except queue.Empty:
1✔
43
            pass
1✔
44
        if (
1✔
45
            task_dict is not None
46
            and "shutdown" in task_dict.keys()
47
            and task_dict["shutdown"]
48
        ):
49
            future_queue.task_done()
1✔
50
            future_queue.join()
1✔
51
            break
1✔
52
        elif task_dict is not None:
1✔
53
            task_args, task_kwargs, future_wait_key_lst = _convert_args_and_kwargs(
1✔
54
                task_dict=task_dict,
55
                memory_dict=memory_dict,
56
                file_name_dict=file_name_dict,
57
            )
58
            task_key, data_dict = _serialize_funct_h5(
1✔
59
                task_dict["fn"], *task_args, **task_kwargs
60
            )
61
            if task_key not in memory_dict.keys():
1✔
62
                if task_key + ".h5out" not in os.listdir(cache_directory):
1✔
63
                    file_name = os.path.join(cache_directory, task_key + ".h5in")
1✔
64
                    dump(file_name=file_name, data_dict=data_dict)
1✔
65
                    process_dict[task_key] = execute_function(
1✔
66
                        command=_get_execute_command(file_name=file_name),
67
                        task_dependent_lst=[
68
                            process_dict[k] for k in future_wait_key_lst
69
                        ],
70
                    )
71
                file_name_dict[task_key] = os.path.join(
1✔
72
                    cache_directory, task_key + ".h5out"
73
                )
74
                memory_dict[task_key] = task_dict["future"]
1✔
75
            future_queue.task_done()
1✔
76
        else:
77
            memory_dict = {
1✔
78
                key: _check_task_output(
79
                    task_key=key, future_obj=value, cache_directory=cache_directory
80
                )
81
                for key, value in memory_dict.items()
82
                if not value.done()
83
            }
84

85

86
def _get_execute_command(file_name):
1✔
87
    return ["python", "-m", "pympipool.cache", file_name]
1✔
88

89

90
def _get_hash(binary):
1✔
91
    # Remove specification of jupyter kernel from hash to be deterministic
92
    binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
1✔
93
    return str(hashlib.md5(binary_no_ipykernel).hexdigest())
1✔
94

95

96
def _serialize_funct_h5(fn, *args, **kwargs):
1✔
97
    binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
1✔
98
    task_key = fn.__name__ + _get_hash(binary=binary_all)
1✔
99
    data = {"fn": fn, "args": args, "kwargs": kwargs}
1✔
100
    return task_key, data
1✔
101

102

103
def _check_task_output(task_key, future_obj, cache_directory):
1✔
104
    file_name = os.path.join(cache_directory, task_key + ".h5out")
1✔
105
    if not os.path.exists(file_name):
1✔
106
        return future_obj
1✔
107
    exec_flag, result = get_output(file_name=file_name)
1✔
108
    if exec_flag:
1✔
109
        future_obj.set_result(result)
1✔
110
    return future_obj
1✔
111

112

113
def _convert_args_and_kwargs(task_dict, memory_dict, file_name_dict):
1✔
114
    task_args = []
1✔
115
    task_kwargs = {}
1✔
116
    future_wait_key_lst = []
1✔
117
    for arg in task_dict["args"]:
1✔
118
        if isinstance(arg, Future):
1✔
119
            match_found = False
1✔
120
            for k, v in memory_dict.items():
1✔
121
                if arg == v:
1✔
122
                    task_args.append(FutureItem(file_name=file_name_dict[k]))
1✔
123
                    future_wait_key_lst.append(k)
1✔
124
                    match_found = True
1✔
125
                    break
1✔
126
            if not match_found:
1✔
NEW
127
                task_args.append(arg.result())
×
128
        else:
129
            task_args.append(arg)
1✔
130
    for key, arg in task_dict["kwargs"].items():
1✔
131
        if isinstance(arg, Future):
1✔
132
            match_found = False
1✔
133
            for k, v in memory_dict.items():
1✔
134
                if arg == v:
1✔
135
                    task_kwargs[key] = FutureItem(file_name=file_name_dict[k])
1✔
136
                    future_wait_key_lst.append(k)
1✔
137
                    match_found = True
1✔
138
                    break
1✔
139
            if not match_found:
1✔
NEW
140
                task_kwargs[key] = arg.result()
×
141
        else:
142
            task_kwargs[key] = arg
1✔
143
    return task_args, task_kwargs, future_wait_key_lst
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc