• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pympipool / 9890133465

11 Jul 2024 10:52AM UTC coverage: 93.776%. Remained the same
9890133465

push

github

web-flow
Merge pull request #370 from pyiron/dependabot/pip/matplotlib-3.9.1

Bump matplotlib from 3.9.0 to 3.9.1

889 of 948 relevant lines covered (93.78%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.19
/pympipool/shell/interactive.py
1
import queue
1✔
2
import subprocess
1✔
3
import threading
1✔
4
from concurrent.futures import Future
1✔
5
from time import sleep
1✔
6
from typing import Optional
1✔
7

8
from pympipool.shared.executor import ExecutorBase, cancel_items_in_queue
1✔
9
from pympipool.shared.thread import RaisingThread
1✔
10

11

12
def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10):
1✔
13
    """
14
    Wait for the subprocess.Popen() process to stop executing
15

16
    Args:
17
        process (subprocess.Popen): process object
18
        sleep_interval (float): interval to sleep during poll() calls
19
    """
20
    while process.poll() is None:
1✔
21
        sleep(sleep_interval)
1✔
22

23

24
def execute_single_task(future_queue: queue.Queue):
1✔
25
    """
26
    Process items received via the queue.
27

28
    Args:
29
        future_queue (queue.Queue):
30
    """
31
    process = None
1✔
32
    while True:
1✔
33
        task_dict = future_queue.get()
1✔
34
        if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
1✔
35
            if process is not None and process.poll() is None:
1✔
36
                process.stdin.flush()
1✔
37
                process.stdin.close()
1✔
38
                process.stdout.close()
1✔
39
                process.stderr.close()
1✔
40
                process.terminate()
1✔
41
                wait_for_process_to_stop(process=process)
1✔
42
            future_queue.task_done()
1✔
43
            # future_queue.join()
44
            break
1✔
45
        elif "init" in task_dict.keys() and task_dict["init"]:
1✔
46
            process = subprocess.Popen(
1✔
47
                *task_dict["args"],
48
                stdin=subprocess.PIPE,
49
                stdout=subprocess.PIPE,
50
                stderr=subprocess.PIPE,
51
                **task_dict["kwargs"],
52
            )
53
        elif "future" in task_dict.keys():
1✔
54
            if process is None:
1✔
55
                raise ValueError("process not initialized")
×
56
            elif process.poll() is None:
1✔
57
                f = task_dict.pop("future")
1✔
58
                if f.set_running_or_notify_cancel():
1✔
59
                    try:
1✔
60
                        process.stdin.write(task_dict["input"])
1✔
61
                        process.stdin.flush()
1✔
62
                        lines_count = 0
1✔
63
                        output = ""
1✔
64
                        while True:
1✔
65
                            output_current = process.stdout.readline()
1✔
66
                            output += output_current
1✔
67
                            lines_count += 1
1✔
68
                            if (
1✔
69
                                task_dict["stop_read_pattern"] is not None
70
                                and task_dict["stop_read_pattern"] in output_current
71
                            ):
72
                                break
1✔
73
                            elif (
1✔
74
                                task_dict["lines_to_read"] is not None
75
                                and task_dict["lines_to_read"] == lines_count
76
                            ):
77
                                break
1✔
78
                        f.set_result(output)
1✔
79
                    except Exception as thread_exception:
×
80
                        future_queue.task_done()
×
81
                        f.set_exception(exception=thread_exception)
×
82
                        raise thread_exception
×
83
                    else:
84
                        future_queue.task_done()
1✔
85
            else:
86
                raise ValueError("process exited")
×
87

88

89
class ShellExecutor(ExecutorBase):
1✔
90
    """
91
    In contrast to the other pympipool.shell.SubprocessExecutor and the pympipool.Executor the pympipool.shell.ShellExecutor
92
    can only execute a single process at a given time. Still it adds the capability to interact with this process during
93
    its execution. The initialization of the pympipool.shell.ShellExecutor takes the same input arguments as the
94
    subprocess.Popen() call for the standard library to start a subprocess.
95

96
    Examples
97

98
        >>> from pympipool import ShellExecutor
99
        >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
100
        >>>     future_lines = exe.submit(string_input="4", lines_to_read=5)
101
        >>>     print(future_lines.done(), future_lines.result(), future_lines.done())
102
        (False, "0\n1\n2\n3\ndone\n", True)
103

104
        >>> from pympipool import ShellExecutor
105
        >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
106
        >>>     future_pattern = exe.submit(string_input="4", stop_read_pattern="done")
107
        >>>     print(future_pattern.done(), future_pattern.result(), future_pattern.done())
108
        (False, "0\n1\n2\n3\ndone\n", True)
109
    """
110

111
    def __init__(self, *args, **kwargs):
1✔
112
        super().__init__()
1✔
113
        self._set_process(
1✔
114
            process=RaisingThread(
115
                target=execute_single_task,
116
                kwargs={
117
                    "future_queue": self._future_queue,
118
                },
119
            ),
120
        )
121
        self._future_queue.put({"init": True, "args": args, "kwargs": kwargs})
1✔
122

123
    def submit(
1✔
124
        self,
125
        string_input: str,
126
        lines_to_read: Optional[int] = None,
127
        stop_read_pattern: Optional[str] = None,
128
    ):
129
        """
130
        Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure
131
        to identify the completion of the execution. This can either be provided based on the number of lines to read
132
        using the `lines_to_read` parameter or by providing a string pattern using the `stop_read_pattern` to stop
133
        reading new lines. One of these two stopping criteria has to be defined.
134

135
        Args:
136
            string_input (str): Input to be communicated to the underlying executable
137
            lines_to_read (None/int): integer number of lines to read from the command line (optional)
138
            stop_read_pattern (None/str): string pattern to indicate the command line output is completed (optional)
139

140
        Returns:
141
            A Future representing the given call.
142
        """
143
        if lines_to_read is None and stop_read_pattern is None:
1✔
144
            raise ValueError(
×
145
                "Either the number of lines_to_read (int) or the stop_read_pattern (str) has to be defined."
146
            )
147
        if string_input[-1:] != "\n":
1✔
148
            string_input += "\n"
1✔
149
        f = Future()
1✔
150
        self._future_queue.put(
1✔
151
            {
152
                "future": f,
153
                "input": string_input,
154
                "lines_to_read": lines_to_read,
155
                "stop_read_pattern": stop_read_pattern,
156
            }
157
        )
158
        return f
1✔
159

160
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
161
        """Clean-up the resources associated with the Executor.
162

163
        It is safe to call this method several times. Otherwise, no other
164
        methods can be called after this one.
165

166
        Args:
167
            wait: If True then shutdown will not return until all running
168
                futures have finished executing and the resources used by the
169
                parallel_executors have been reclaimed.
170
            cancel_futures: If True then shutdown will cancel all pending
171
                futures. Futures that are completed or running will not be
172
                cancelled.
173
        """
174
        if cancel_futures:
1✔
175
            cancel_items_in_queue(que=self._future_queue)
×
176
        self._future_queue.put({"shutdown": True, "wait": wait})
1✔
177
        if wait:
1✔
178
            self._process.join()
1✔
179
            # self._future_queue.join()
180
        self._process = None
1✔
181
        self._future_queue = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc