• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jtmoon79 / super-speedy-syslog-searcher / 26325382104

23 May 2026 06:07AM UTC coverage: 68.035% (-0.009%) from 68.044%
26325382104

push

github

jtmoon79
(CI) add workflow_dispatch

15450 of 22709 relevant lines covered (68.03%)

130014.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.72
/src/python/pyrunner.rs
1
// src/python/pyrunner.rs
2

3
//! Runs a Python process instance. It communicates with the Python process
4
//! over threaded `PipeStreamReader`s connected to stdout, stderr, and stdin.
5
//! It uses `std::process::Child` to start and manage the Python process.
6

7
use std::cmp::{
8
    max,
9
    min,
10
};
11
use std::collections::{
12
    HashSet,
13
    VecDeque,
14
};
15
use std::env;
16
use std::io::{
17
    Error,
18
    ErrorKind,
19
    Read,
20
    Result,
21
    Write,
22
    stderr,
23
    stdout,
24
};
25
use std::path::PathBuf;
26
use std::process::{
27
    Child,
28
    Command,
29
    Stdio,
30
};
31
use std::sync::RwLock;
32
use std::thread;
33
use std::time::{
34
    Duration,
35
    Instant,
36
};
37

38
use ::crossbeam_channel::{
39
    Sender,
40
    Receiver,
41
    RecvError,
42
    RecvTimeoutError,
43
    Select,
44
};
45
use ::lazy_static::lazy_static;
46
use ::memchr::memmem::Finder as memchr_Finder;
47
use ::once_cell::sync::OnceCell;
48
use ::pathsearch::find_executable_in_path;
49
use ::shell_escape::escape;
50
#[allow(unused_imports)]
51
use ::si_trace_print::{
52
    defñ,
53
    defn,
54
    defo,
55
    defx,
56
    def1ñ,
57
    def1n,
58
    def1o,
59
    def1x,
60
    def2ñ,
61
    def2n,
62
    def2o,
63
    def2x,
64
    e,
65
    ef1n,
66
    ef1o,
67
    ef1x,
68
    ef1ñ,
69
    ef2n,
70
    ef2o,
71
    ef2x,
72
    ef2ñ,
73
};
74

75
use crate::{
76
    de_err,
77
    de_wrn,
78
    debug_assert_none,
79
    debug_panic,
80
};
81
use crate::common::{
82
    Bytes,
83
    Count,
84
    FPath,
85
    threadid_to_u64,
86
    summary_stat,
87
};
88
#[cfg(any(debug_assertions, test))]
89
use crate::debug::printers::buffer_to_String_noraw;
90
use crate::readers::helpers::path_to_fpath;
91
use crate::python::venv::venv_path;
92

93
/// Python process exit result
94
pub type ExitStatus = std::process::ExitStatus;
95

96
/// Size of pipe read/write buffers in bytes
97
pub type PipeSz = usize;
98

99
/// Delimiter byte used to separate chunks of data read from the Python process
100
pub type ChunkDelimiter = u8;
101

102
/// Names of possible Python executables that could be found in path
103
pub const PYTHON_NAMES: [&str; 13] = [
104
    "python3",
105
    "python",
106
    "python3.exe",
107
    "python.exe",
108
    "python37",
109
    "python38",
110
    "python39",
111
    "python310",
112
    "python311",
113
    "python312",
114
    "python313",
115
    "pypy3",
116
    "pypy",
117
];
118
/// Possible subdirectories within a Python installation where the Python
119
/// interpreter executable may be found
120
pub const PYTHON_SUBDIRS: [&str; 3] = [
121
    "bin",
122
    "Scripts",
123
    "",
124
];
125

126
pub const PROMPT_DEFAULT: &str = "$ ";
127

128
pub const CHANNEL_CAPACITY: usize = 16;
129

130
/// Environment variable that refers to the exact path to a Python interpreter
131
/// executable
132
pub const PYTHON_ENV: &str = "S4_PYTHON";
133

134
/// default timeout for Pipe `recv_timeout` when reading from the child Python processes
135
pub const RECV_TIMEOUT: Duration = Duration::from_millis(5);
136

137
/// cached Python path found in environment variable `S4_PYTHON`.
138
/// set in `find_python_executable`
139
#[allow(non_upper_case_globals)]
140
pub static PythonPathEnv: OnceCell<Option<FPath>> = OnceCell::new();
141
/// cached Python path found in path, set in `find_python_executable`
142
#[allow(non_upper_case_globals)]
143
pub static PythonPathPath: OnceCell<Option<FPath>> = OnceCell::new();
144
/// cached Python path in s4 venv, set in `find_python_executable`
145
#[allow(non_upper_case_globals)]
146
pub static PythonPathVenv: OnceCell<Option<FPath>> = OnceCell::new();
147

148
lazy_static! {
149
    /// Summary statistic.
150
    /// Record which Python interpreters ran.
151
    /// only intended for summary printing
152
    pub static ref PythonPathsRan: RwLock<HashSet<FPath>> = {
153
        defñ!("init PythonPathsRan");
154

155
        RwLock::new(HashSet::<FPath>::new())
156
    };
157
}
158

159
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
160
pub enum PythonToUse {
161
    /// only use Python referred to by environment variable `S4_PYTHON`
162
    Env,
163
    /// only use Python found in the `PATH`
164
    Path,
165
    /// use Python referred to by environment variable `S4_PYTHON` if set
166
    /// but if not set then use Python found in the `PATH`
167
    EnvPath,
168
    /// only use Python in predetermined s4 .venv
169
    Venv,
170
    /// use Python referred to by environment variable `S4_PYTHON` if set
171
    /// but if not set then use Python in predetermined s4 .venv
172
    EnvVenv,
173
    /// use a passed value
174
    Value,
175
}
176

177
/// find a Python executable.
178
/// `python_to_use` instructs how to find the Python executable.
179
/// Does not check if the found Python executable is valid.
180
/// Caches found paths for reliability among threads.
181
/// Returns `None` when passed `PythonToUse::Value`.
182
pub fn find_python_executable(python_to_use: PythonToUse) -> &'static Option<FPath> {
85✔
183
    defn!("{:?}", python_to_use);
85✔
184

185
    match python_to_use {
85✔
186
        PythonToUse::Env => {
187
            let ret: &Option<FPath> = PythonPathEnv.get_or_init(||
12✔
188
                // check process environment variable
189
                match env::var(PYTHON_ENV) {
1✔
190
                    Ok(val) => {
×
191
                        defo!("env::var found {}={:?}", PYTHON_ENV, val);
×
192
                        if ! val.is_empty() {
×
193
                            Some(val)
×
194
                        } else {
195
                            None
×
196
                        }
197
                    }
198
                    Err (_err) => {
1✔
199
                        defo!("env::var did not find {:?}; {:?}", PYTHON_ENV, _err);
1✔
200
                        None
1✔
201
                    }
202
                }
1✔
203
            );
204
            defx!("{:?}, return {:?}", python_to_use, ret);
12✔
205

206
            ret
12✔
207
        }
208
        PythonToUse::Path => {
209
            let ret: &Option<FPath> = PythonPathPath.get_or_init(||{
42✔
210
                let mut python_path: Option<PathBuf> = None;
1✔
211
                // check PATH for python executable
212
                for name in PYTHON_NAMES.iter() {
1✔
213
                    defo!("find_executable_in_path({:?})", name);
1✔
214
                    if let Some(p) = find_executable_in_path(name) {
1✔
215
                        defo!("find_executable_in_path returned {:?}", p);
1✔
216
                        python_path = Some(p);
1✔
217
                        break;
1✔
218
                    };
×
219
                }
220
                if let Some(p) = python_path {
1✔
221
                    Some(path_to_fpath(p.as_path()))
1✔
222
                } else {
223
                    None
×
224
                }
225
            });
1✔
226
            defx!("{:?}, return {:?}", python_to_use, ret);
42✔
227

228
            ret
42✔
229
        }
230
        PythonToUse::EnvPath => {
231
            // try Env then try Path
232
            let p = find_python_executable(PythonToUse::Env);
1✔
233
            if p.is_some() {
1✔
234
                defx!("{:?}, return {:?}", python_to_use, p);
×
235
                return p;
×
236
            }
1✔
237
            let p = find_python_executable(PythonToUse::Path);
1✔
238
            defx!("{:?}, return {:?}", python_to_use, p);
1✔
239

240
            p
1✔
241
        }
242
        PythonToUse::Venv => {
243
            let ret: &Option<FPath> = PythonPathVenv.get_or_init(||{
19✔
244
                // get the venv path
245
                let venv: PathBuf = venv_path();
1✔
246
                defo!("venv={:?}", venv);
1✔
247
                // look for common subdirectories of Python virtual environments where the
248
                // Python executable may be found
249
                // XXX: we could try to do this by platform
250
                //      i.e. on Windows only look in "Scripts", etc.
251
                //      but this is fine
252
                for dir in PYTHON_SUBDIRS.iter() {
1✔
253
                    let mut venv_dir = venv.clone();
1✔
254
                    if ! dir.is_empty() {
1✔
255
                        venv_dir.push(dir);
1✔
256
                    }
1✔
257
                    for name in PYTHON_NAMES.iter() {
1✔
258
                        let mut venv_name = venv_dir.clone();
1✔
259
                        venv_name.push(name);
1✔
260
                        defo!("venv_name.exists?={:?}", venv_name);
1✔
261
                        if venv_name.exists() {
1✔
262
                            let fp = path_to_fpath(venv_name.as_path());
1✔
263
                            defo!("found venv python executable: {:?}", fp);
1✔
264
                            return Some(fp);
1✔
265
                        }
×
266
                    }
267
                }
268
                None
×
269
            });
1✔
270
            defx!("{:?}, return {:?}", python_to_use, ret);
19✔
271

272
            ret
19✔
273
        }
274
        PythonToUse::EnvVenv => {
275
            // try Env then try Venv
276
            let p = find_python_executable(PythonToUse::Env);
11✔
277
            if p.is_some() {
11✔
278
                defx!("{:?}, return {:?}", python_to_use, p);
×
279
                return p;
×
280
            }
11✔
281
            let p = find_python_executable(PythonToUse::Venv);
11✔
282
            defx!("{:?}, return {:?}", python_to_use, p);
11✔
283

284
            p
11✔
285
        }
286
        PythonToUse::Value => {
287
            debug_panic!("PythonToUse::Value should not be used in find_python_executable");
×
288

289
            &None
×
290
        }
291
    }
292
}
85✔
293

294
#[derive(Debug)]
295
enum PipedChunk {
296
    /// a chunk of bytes read from the child process
297
    Chunk(Bytes),
298
    /// process not sending but still running
299
    Continue,
300
    /// process exited or no more data to read
301
    /// contains number of reads performed and remaining bytes
302
    Done(u64, Bytes),
303
}
304

305
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
306
enum ProcessStatus {
307
    #[default]
308
    Running,
309
    Exited,
310
}
311

312
/// Reads data from the pipe and returns chunks of data to the caller.
313
/// Churks are delimited by the passed `chunk_delimiter_opt` if set.
314
/// If `chunk_delimiter_opt` is `None` then each read immediately returns any data read.
315
///
316
/// Inspired by gist [ArtemGr/db40ae04b431a95f2b78](https://gist.github.com/ArtemGr/db40ae04b431a95f2b78).
317
struct PipeStreamReader {
318
    chunk_receiver: Receiver<core::result::Result<PipedChunk, Error>>,
319
    exit_sender: Sender<ProcessStatus>,
320
}
321

322
impl PipeStreamReader {
323
    /// Starts a thread reading bytes from a child process pipe.
324
    ///
325
    /// `pipe_sz` is the size of the Pipe chunk buffer in bytes.
326
    ///
327
    /// `recv_timeout` is the timeout duration for calls to `recv_timeout()`.
328
    ///
329
    /// `chunk_delimiter_opt` is an optional byte delimiter used to separate chunks of data.
330
    /// If `None` then each read immediately returns any data read.
331
    ///
332
    /// `stream_child_proc` is the `Read` stream of the child process to read from.
333
    ///
334
    /// `name` and `pid` are used for debugging messages.
335
    /// `name` is the name of the pipe.
336
    /// `pid` is the process ID of the child process.
337
    fn new(
138✔
338
        name: String,
138✔
339
        pid: u32,
138✔
340
        pipe_sz: PipeSz,
138✔
341
        recv_timeout: Duration,
138✔
342
        chunk_delimiter_opt: Option<ChunkDelimiter>,
138✔
343
        mut stream_child_proc: Box<dyn Read + Send>
138✔
344
    ) -> PipeStreamReader
138✔
345
    {
346
        def1n!("PipeStreamReader new(pipe_sz={}, name={:?}, chunk_delimiter_opt={:?})",
138✔
347
               pipe_sz, name, chunk_delimiter_opt);
348
        def1o!("PipeStreamReader {:?} create bounded({}) channel", name, CHANNEL_CAPACITY);
138✔
349
        let (tx_exit, rx_exit) =
138✔
350
            ::crossbeam_channel::bounded(CHANNEL_CAPACITY);
138✔
351

352
        PipeStreamReader {
353
            chunk_receiver: {
354
                let thread_name: String = format!("{}_PipeStreamReader", name);
138✔
355
                let _thread_name2: String = thread_name.clone();
138✔
356
                // parent thread ID
357
                let _tidn_p: u64 = threadid_to_u64(thread::current().id());
138✔
358
                // debug message prepend
359
                let _d_p = format!(
138✔
360
                    "PipeStreamReader {:?} PID {:?} PTID {:?}",
361
                    name, pid, _tidn_p
362
                );
363
                def1o!("{_d_p} create unbounded() channel");
138✔
364
                let (tx_parent, rx_parent) =
138✔
365
                    ::crossbeam_channel::unbounded();
138✔
366

367
                let thread_pipe = thread::Builder::new().name(thread_name.clone());
138✔
368

369
                def1o!("{_d_p} spawn thread {:?}", thread_name);
138✔
370
                let result = thread_pipe.spawn(move ||
138✔
371
                {
138✔
372
                    // debug message prepend
373
                    let _d_p = format!(
138✔
374
                        "PipeStreamReader {:?} PID {:?} PTID {:?} TID {:?}",
375
                        name, pid, _tidn_p, threadid_to_u64(thread::current().id()));
138✔
376
                    def2n!("{_d_p} start, pipe_sz {}", pipe_sz);
138✔
377
                    let mut _recv_bytes: usize = 0;
138✔
378
                    let mut reads: usize = 0;
138✔
379
                    let mut _sends: usize = 0;
138✔
380
                    let mut delim_found: bool = false;
138✔
381
                    let mut buf = Bytes::with_capacity(pipe_sz);
138✔
382
                    let buf_chunk1_sz: usize = match chunk_delimiter_opt {
138✔
383
                        Some(_delim) => pipe_sz,
79✔
384
                        None => 0, // `buf_chunk1` not used
59✔
385
                    };
386
                    let mut buf_chunk1: Bytes = Bytes::with_capacity(buf_chunk1_sz);
138✔
387
                    //let mut buf_chunk2: Bytes = Bytes::with_capacity(pipe_sz);
388
                    loop {
389
                        reads += 1;
94,092✔
390
                        buf.clear();
94,092✔
391
                        buf.resize(pipe_sz, 0);
94,092✔
392

393
                        def2o!("{_d_p} stream_child_proc.read(buf capacity {}, len {})…", buf.capacity(), buf.len());
94,092✔
394
                        /*
395
                        From the docs regarding read():
396

397
                            This function does not provide any guarantees about whether it blocks
398
                            waiting for data, but if an object needs to block for a read and cannot,
399
                            it will typically signal this via an Err return value.
400

401
                        See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
402
                        */
403
                        match stream_child_proc.read(&mut buf) {
94,092✔
404
                            Ok(0) => {
405
                                def2o!("{_d_p} read zero bytes of {} total", _recv_bytes);
140✔
406
                                /*
407
                                From the docs regarding read() returning Ok(0):
408

409
                                    This reader has reached its "end of file" and will likely no
410
                                    longer be able to produce bytes. Note that this does not mean
411
                                    that the reader will always no longer be able to produce bytes.
412
                                    As an example, on Linux, this method will call the recv syscall
413
                                    for a [TcpStream], where returning zero indicates the connection
414
                                    was shut down correctly. While for [File], it is possible to
415
                                    reach the end of file and get zero as result, but if more data
416
                                    is appended to the file, future calls to read will return more
417
                                    data.
418

419
                                See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
420
                                */
421
                                if delim_found {
140✔
422
                                    delim_found = false;
45✔
423
                                }
95✔
424
                                // XXX: if the child python process has exited then this may become
425
                                //      a busy loop until the parent thread notices the python
426
                                //      process has exited and then can send a
427
                                //      `ProcessStatus::Exited`.
428
                                //      Using `recv_timeout(5ms)` softens this busy loop.
429
                                //      It's ugly but it works.
430
                                def2o!("{_d_p} rx_exit.recv_timeout({:?}) (len {})…", recv_timeout, rx_exit.len());
140✔
431
                                let rx_result = rx_exit.recv_timeout(recv_timeout);
140✔
432
                                match rx_result {
74✔
433
                                    Ok(ProcessStatus::Exited) => {
434
                                        def2o!("{_d_p} rx_exit ProcessStatus::Exited; send Done({}, buf_chunk1 {} bytes) and break",
74✔
435
                                               reads, buf_chunk1.len());
74✔
436
                                        _sends += 1;
74✔
437
                                        def2o!("{_d_p} tx_parent.send(Ok(PipedChunk::Done({}, buf_chunk1 {} bytes))) (channel len {})…",
74✔
438
                                               reads, buf_chunk1.len(), tx_parent.len());
74✔
439
                                        match tx_parent.send(Ok(PipedChunk::Done(reads as u64, buf_chunk1))) {
74✔
440
                                            Ok(_) => {}
74✔
441
                                            Err(_err) => {
×
442
                                                def2o!("{_d_p} tx send error: {:?}", _err);
×
443
                                            }
444
                                        }
445
                                        break;
74✔
446
                                    }
447
                                    Ok(ProcessStatus::Running) => {
448
                                        def2o!("{_d_p} rx_exit ProcessStatus::Running; continue reading");
×
449
                                    }
450
                                    Err(RecvTimeoutError::Timeout) => {
451
                                        def2o!("{_d_p} RecvTimeoutError::Timeout; continue reading");
11✔
452
                                    }
453
                                    Err(RecvTimeoutError::Disconnected) => {
454
                                        def2o!("{_d_p} RecvTimeoutError::Disconnected; break");
55✔
455
                                        break;
55✔
456
                                    }
457
                                }
458
                                // send Continue if no more messages to process by parent thread
459
                                if tx_parent.is_empty() {
11✔
460
                                    def2o!("{_d_p} tx_parent.send(Ok(PipedChunk::Continue))…");
11✔
461
                                    match tx_parent.send(Ok(PipedChunk::Continue)) {
11✔
462
                                        Ok(_) => {
11✔
463
                                            _sends += 1;
11✔
464
                                        }
11✔
465
                                        Err(_err) => {
×
466
                                            def2o!("{_d_p} tx send error: {:?}", _err);
×
467
                                            de_err!("{_d_p} tx send error: {:?}", _err);
×
468
                                        }
469
                                    };
470
                                }
×
471
                            }
472
                            Ok(len) => {
93,952✔
473
                                _recv_bytes += len;
93,952✔
474
                                def2o!("{_d_p} (read #{}) read {} bytes of {} total in this pipe", reads, len, _recv_bytes);
93,952✔
475
                                // is there a chunk delimiter in the buffer?
476

477
                                match chunk_delimiter_opt {
93,952✔
478
                                    Some(chunk_delimiter) => {
90,992✔
479
                                        // look for delimiter
480
                                        // TODO: [2025/12] add benchmark to compare different methods
481
                                        //       of finding a delimiter.
482
                                        //       This `find_memchr` creates a new `Finder<'_>`
483
                                        //       which may not be worth the trouble.
484
                                        let needle = &[chunk_delimiter];
90,992✔
485
                                        let finder = memchr_Finder::new(needle);
90,992✔
486
                                        let mut at: usize = 0;
90,992✔
487
                                        let mut _loop: usize = 0;
90,992✔
488
                                        while at < len {
184,634✔
489
                                            _loop += 1;
95,957✔
490
                                            def2o!("{_d_p} (read #{reads} loop {_loop}) searching for delimiter in buf[{at}..{len}] '{}'", 
95,957✔
491
                                                buffer_to_String_noraw(&buf[at..len]));
95,957✔
492
                                            match finder.find(&buf[at..len]) {
95,957✔
493
                                                Some(pos) => {
8,816✔
494
                                                    // delimiter found at pos
495
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) found delimiter at pos {} (absolute pos {}) among {} returned bytes; buf len {}, buf capacity {}",
8,816✔
496
                                                        pos, at + pos, len, buf.len(), buf.capacity());
8,816✔
497
                                                    debug_assert!(at + pos < buf.len(), "at {} + pos {} >= buf.len {}", at, pos, buf.len());
8,816✔
498
                                                    // send chunks, keep the remainder
499
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(buf[{}..{}])", at, at + pos + 1);
8,816✔
500
                                                    buf_chunk1.extend_from_slice(&buf[at..at + pos + 1]);
8,816✔
501
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: '{}'", buffer_to_String_noraw(&buf_chunk1));
8,816✔
502
                                                    let blen = buf_chunk1.len();
8,816✔
503
                                                    let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
8,816✔
504
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send.extend_from_slice(&buf_chunk1 len {}) (chunk_send capacity {})",
8,816✔
505
                                                        buf_chunk1.len(), chunk_send.capacity());
8,816✔
506
                                                    chunk_send.extend_from_slice(&buf_chunk1);
8,816✔
507
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send: '{}' (channel len {})",
8,816✔
508
                                                           buffer_to_String_noraw(&chunk_send), tx_parent.len());
8,816✔
509
                                                    let data_send = PipedChunk::Chunk(chunk_send);
8,816✔
510
                                                    _sends += 1;
8,816✔
511
                                                    match tx_parent.send(Ok(data_send)) {
8,816✔
512
                                                        Ok(_) => {
513
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) sent chunk_send {} bytes, send #{_sends}", blen);
6,501✔
514
                                                        }
515
                                                        Err(_err) => {
2,315✔
516
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) send error: {:?}", _err);
2,315✔
517
                                                            break;
2,315✔
518
                                                        }
519
                                                    }
520
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.clear()");
6,501✔
521
                                                    buf_chunk1.clear();
6,501✔
522
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(&buf[{}..{}]) (buf len {}, buf capacity {})",
523
                                                    //     at + pos + 1, len, buf.len(), buf.capacity());
524
                                                    // buf_chunk1.extend_from_slice(&buf[at + pos + 1..len]);
525
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
526
                                                    //     buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
527
                                                    delim_found = true;
6,501✔
528
                                                    at += pos + 1;
6,501✔
529
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) {} bytes remaining in buf", len - at);
6,501✔
530
                                                }
531
                                                None => {
532
                                                    // delimiter not found, save buffer and then read child process again
533
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) no delimiter; buf_chunk1.extend_from_slice(&buf[{}..{}]) '{}'",
87,141✔
534
                                                        at, len, buffer_to_String_noraw(&buf[at..len]));
87,141✔
535
                                                    buf_chunk1.extend_from_slice(&buf[at..len]);
87,141✔
536
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
87,141✔
537
                                                        buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
87,141✔
538
                                                    delim_found = false;
87,141✔
539
                                                    at += len + 1;
87,141✔
540
                                                }
541
                                            }
542
                                        }
543
                                    }
544
                                    None => {
545
                                        // no delimiter configured, send entire buffer as a chunk
546
                                        let slice_ = &buf[..len];
2,960✔
547
                                        let blen = slice_.len();
2,960✔
548
                                        let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
2,960✔
549
                                        chunk_send.extend_from_slice(slice_);
2,960✔
550
                                        let data_send = PipedChunk::Chunk(chunk_send);
2,960✔
551
                                        delim_found = false;
2,960✔
552
                                        def2o!("{_d_p} (read #{reads}) read {} bytes of {} total; no delimiter configured, send Chunk {} bytes",
2,960✔
553
                                            len, _recv_bytes, blen);
554
                                        _sends += 1;
2,960✔
555
                                        match tx_parent.send(Ok(data_send)) {
2,960✔
556
                                            Ok(_) => {
557
                                                def2o!("{_d_p} (read #{reads}) sent chunk_send {} bytes, send #{_sends}", blen);
2,951✔
558
                                            }
559
                                            Err(_err) => {
9✔
560
                                                def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
9✔
561
                                                break;
9✔
562
                                            }
563
                                        }
564
                                    }
565
                                }
566
                            }
567
                            Err(error) => {
×
568
                                if error.kind() == ErrorKind::Interrupted {
×
569
                                    def2o!("{_d_p} (read #{reads}) read interrupted; retry");
×
570
                                    continue;
×
571
                                }
×
572
                                def2o!("{_d_p} (read #{reads}) read error {}; send Error", error);
×
573
                                delim_found = false;
×
574
                                _sends += 1;
×
575
                                match tx_parent.send(Err(error)) {
×
576
                                    Ok(_) => {}
×
577
                                    Err(_err) => {
×
578
                                        def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
×
579
                                        break;
×
580
                                    }
581
                                }
582
                            }
583
                        }
584
                    }
585
                    def2x!(
138✔
586
                        "{_d_p} exit, received {} bytes, child process reads {}, parent thread sends {}",
587
                        _recv_bytes, reads, _sends
588
                    );
589
                });
138✔
590
                match result {
138✔
591
                    Ok(_handle) => {
138✔
592
                        def1o!("{_d_p} spawned thread {:?}", _thread_name2);
138✔
593
                    }
594
                    Err(_err) => {
×
595
                        def1o!("{_d_p} thread spawn error: {}", _err);
×
596
                    }
597
                }
598
                def1x!("{_d_p} return Receiver");
138✔
599

600
                rx_parent
138✔
601
            },
602
            exit_sender: tx_exit,
138✔
603
        }
604
    }
138✔
605
}
606

607
/// `PyRunner` is a struct that represents a Python process instance. It hides
608
/// the complications of starting and communicating with a Python process
609
/// over pipes. It uses `std::process::Child` to start and manage the Python
610
/// process. This handles the complexity of asynchronous inter-process
611
/// communication which is non-trivial to implement decently.
612
///
613
/// _XXX:_ ideally, this would use `PyO3` to communicate with a Python interpreter
614
/// instance. However, [`PyO3::Python::attach`] only creates one
615
/// Python process per Rust process.
616
/// And `PyO3` does not provide a way to create Python subprocesses. So all
617
/// Rust process threads that would use `PyO3` are bottlenecked by this
618
/// one Python process which is of course, in effect, a single-threaded process.
619
/// See [PyO3 Issue #576].
620
/// So instead each `PyRunner` instance creates a new Python process using
621
/// [`std::process::Child`] and communicates over stdout, stderr, and stdin pipes.
622
///
623
/// _XXX:_ I also tried using crate `subprocess` to manage the Python process. However,
624
/// it was not able to handle irregular asynchronous communication.
625
///
626
/// [`PyO3::Python::attach`]: https://docs.rs/pyo3/0.27.1/pyo3/marker/struct.Python.html#method.attach
627
/// [PyO3 Issue #576]: https://github.com/PyO3/pyo3/issues/576
628
pub struct PyRunner {
629
    /// handle to the Python process
630
    //pub process: subprocess::Popen,
631
    pub process: Child,
632
    pipe_stdout: PipeStreamReader,
633
    pipe_stderr: PipeStreamReader,
634
    /// arguments of the process
635
    argv: Vec<String>,
636
    /// path to Python exectuable
637
    pub python_path: FPath,
638
    /// save the `ExitStatus`
639
    exit_status: Option<ExitStatus>,
640
    pipe_stdout_eof: bool,
641
    pipe_stderr_eof: bool,
642
    /// protect against sending repeat exit messages to child pipe threads.
643
    /// only used in `pipes_exit_sender()`
644
    pipe_sent_exit: bool,
645
    /// pipe buffer size in bytes for stdout `PipeStreamReader`
646
    pub pipe_sz_stdout: PipeSz,
647
    /// pipe buffer size in bytes for stderr `PipeStreamReader`
648
    pub pipe_sz_stderr: PipeSz,
649
    /// `Instant` Python process was started.
650
    time_beg: Instant,
651
    /// `Instant` the Python process was first known to be exited.
652
    time_end: Option<Instant>,
653
    /// process ID of the Python process
654
    pid_: u32,
655
    /// this thread ID. For help during debugging.
656
    _tidn: u64,
657
    /// debug message prepend. For help during debugging.
658
    _d_p: String,
659
    /// all stderr is stored in case the process exits with an error
660
    /// this is because stderr may be `read` and only later calls to
661
    /// `poll` or `wait` may find the process has exited.
662
    ///
663
    /// oldest stderr data nearest the front
664
    stderr_all: Option<VecDeque<u8>>,
665
    /// Summary statistic.
666
    /// Maximum number of messages seen in the pipe_stdout channel.
667
    pub(crate) pipe_channel_max_stdout: Count,
668
    /// Summary statistic.
669
    /// Maximum number of messages seen in the pipe_stderr channel.
670
    pub(crate) pipe_channel_max_stderr: Count,
671
    /// Summary statistic.
672
    /// count of reads performed by pipeline thread reading Python process stdout
673
    pub(crate) count_proc_reads_stdout: Count,
674
    /// Summary statistic.
675
    /// count of reads performed by pipeline thread reading Python process stderr
676
    pub(crate) count_proc_reads_stderr: Count,
677
    /// Summary statistic.
678
    /// count of recv of pipeline thread reading Python process stdout
679
    pub(crate) count_pipe_recv_stdout: Count,
680
    /// Summary statistic.
681
    /// count of recv of pipeline thread reading Python process stderr
682
    pub(crate) count_pipe_recv_stderr: Count,
683
    /// Summary statistic.
684
    /// count of writes to Python process stdin
685
    pub(crate) count_proc_writes: Count,
686
    /// Summary statistic.
687
    /// count of polls of Python process
688
    pub(crate) count_proc_polls: Count,
689
    /// Duration of process waiting
690
    pub(crate) duration_proc_wait: Duration,
691
    /// first seen error
692
    pub(crate) error: Option<Error>,
693
}
694

695
impl std::fmt::Debug for PyRunner {
696
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
697
        f.debug_struct("PyRunner")
×
698
            .field("process", &self.process)
×
699
            .field("argv", &self.argv)
×
700
            .field("python_path", &self.python_path)
×
701
            .field("exit_status", &self.exit_status)
×
702
            .field("pipe_stdout_eof", &self.pipe_stdout_eof)
×
703
            .field("pipe_stderr_eof", &self.pipe_stderr_eof)
×
704
            .field("pipe_sent_exit", &self.pipe_sent_exit)
×
705
            .field("pipe_sz_stdout", &self.pipe_sz_stdout)
×
706
            .field("pipe_sz_stderr", &self.pipe_sz_stderr)
×
707
            .field("time_beg", &self.time_beg)
×
708
            .field("time_end", &self.time_end)
×
709
            .field("pid_", &self.pid_)
×
710
            .field("_tidn", &self._tidn)
×
711
            .finish()
×
712
    }
×
713
}
714

715
impl PyRunner {
716
    /// Create a new `PyRunner` instance.
717
    ///
718
    /// `python_to_use` indicates which Python executable to use.
719
    /// If `PythonToUse::Value` is used then `python_path` must be `Some(FPath)`.
720
    /// Otherwise `python_path` must be `None`.
721
    ///
722
    /// `argv` is the list of arguments to pass to the Python executable.
723
    pub fn new(
69✔
724
        python_to_use: PythonToUse,
69✔
725
        pipe_sz: PipeSz,
69✔
726
        recv_timeout: Duration,
69✔
727
        chunk_delimiter_stdout: Option<ChunkDelimiter>,
69✔
728
        chunk_delimiter_stderr: Option<ChunkDelimiter>,
69✔
729
        python_path: Option<FPath>,
69✔
730
        argv: Vec<&str>
69✔
731
    ) -> Result<Self> {
69✔
732
        def1n!("python_to_use {:?}, python_path {:?}, pipe_sz {:?}, chunk_delimiter_stdout {:?}, chunk_delimiter_stderr {:?}, argv {:?}",
69✔
733
            python_to_use, python_path, pipe_sz, chunk_delimiter_stdout, chunk_delimiter_stderr, argv);
734

735
        let python_path_: &FPath;
736
        // get the Python exectuble
737
        if python_to_use == PythonToUse::Value {
69✔
738
            match &python_path {
49✔
739
                Some(val) => python_path_ = val,
49✔
740
                None => {
741
                    let s = format!("PyRunner::new: python_path must be Some(FPath) when python_to_use is Value");
×
742
                    def1x!("Error InvalidInput {}", s);
×
743
                    return Result::Err(
×
744
                        Error::new(ErrorKind::InvalidInput, s)
×
745
                    );
×
746
                }
747
            }
748
        } else {
749
            debug_assert_none!(python_path, "python_path must be None unless python_to_use is Value");
20✔
750
            python_path_ = match find_python_executable(python_to_use) {
20✔
751
                Some(s) => s,
20✔
752
                None => {
753
                    let s = format!(
×
754
                        "failed to find a Python interpreter; create the Python virtual environment with command --venv, or you may specify the Python interpreter path using environment variable {}; failed",
755
                        PYTHON_ENV
756
                    );
757
                    def1x!("{}", s);
×
758
                    return Result::Err(
×
759
                        Error::new(ErrorKind::NotFound, s)
×
760
                    )
×
761
                }
762
            };
763
        }
764
        def1o!("Using Python executable: {:?}", python_path_);
69✔
765

766
        // construct argv_
767
        let mut argv_: Vec<&str> = Vec::with_capacity(argv.len() + 1);
69✔
768
        argv_.push(python_path_.as_str());
69✔
769
        for arg in argv.iter() {
198✔
770
            argv_.push(arg);
198✔
771
        }
198✔
772

773
        summary_stat!(
69✔
774
            // save this python path
775
            match PythonPathsRan.write() {
69✔
776
                Ok(mut set) => {
69✔
777
                    if ! set.contains(python_path_) {
69✔
778
                        set.insert(python_path_.clone());
2✔
779
                    }
67✔
780
                }
781
                Err(err) => {
×
782
                    def1x!("Failed to acquire write lock on PythonPathsRan: {}", err);
×
783
                    return Result::Err(
×
784
                        Error::other(
×
785
                            format!("Failed to acquire write lock on PythonPathsRan: {}", err),
×
786
                        )
×
787
                    );
×
788
                }
789
            }
790
        );
791

792
        def1o!("Command::new({:?}).args({:?}).spawn()", python_path_, Vec::from_iter(argv_.iter().skip(1)));
69✔
793
        let time_beg: Instant = Instant::now();
69✔
794
        let result = Command::new(python_path_.as_str())
69✔
795
            .args(argv_.iter().skip(1))
69✔
796
            .stdin(Stdio::piped())
69✔
797
            .stdout(Stdio::piped())
69✔
798
            .stderr(Stdio::piped())
69✔
799
            .spawn();
69✔
800
        let mut process: Child = match result {
69✔
801
            Ok(p) => p,
69✔
802
            Err(err) => {
×
803
                def1x!("Failed to start Python process: {}", err);
×
804
                return Result::Err(
×
805
                    Error::new(
×
806
                        err.kind(),
×
807
                        format!("Python process failed to start: Python path {:?}; {}",
×
808
                            python_path_, err),
×
809
                    )
×
810
                );
×
811
            }
812
        };
813

814
        // TODO: [2025/11] is there a more rustic one-liner to create Vec<String> from Vec<&str>?
815
        let mut argv: Vec<String> = Vec::with_capacity(argv_.len());
69✔
816
        for a in argv_.into_iter() {
267✔
817
            argv.push(String::from(a));
267✔
818
        }
267✔
819

820
        let pid: u32 = process.id();
69✔
821
        def1o!("Python process PID {}", pid);
69✔
822

823
        let _d_p = format!("Python process {}", pid);
69✔
824

825
        let process_stdout = match process.stdout.take() {
69✔
826
            Some(s) => s,
69✔
827
            None => {
828
                let s = format!("{_d_p} stdout was None");
×
829
                def1x!("{}", s);
×
830
                return Result::Err(
×
831
                    Error::other(s)
×
832
                );
×
833
            }
834
        };
835
        let process_stderr = match process.stderr.take() {
69✔
836
            Some(s) => s,
69✔
837
            None => {
838
                let s = format!("{_d_p} stderr was None");
×
839
                def1x!("{}", s);
×
840
                return Result::Err(
×
841
                    Error::other(s)
×
842
                );
×
843
            }
844
        };
845

846
        // create PipeStreamReaders for stdout, stderr
847
        def1o!("{_d_p} PipeStreamReader::new() stdout");
69✔
848
        let pipe_sz_stdout: usize = pipe_sz;
69✔
849
        let pipe_stdout = PipeStreamReader::new(
69✔
850
            String::from("stdout"),
69✔
851
            pid,
69✔
852
            pipe_sz_stdout,
69✔
853
            recv_timeout,
69✔
854
            chunk_delimiter_stdout,
69✔
855
            Box::new(process_stdout)
69✔
856
        );
857
        def1o!("{_d_p} PipeStreamReader::new() stderr");
69✔
858
        // stderr pipe capped at 5096 bytes
859
        let pipe_sz_stderr: usize = min(pipe_sz, 5096);
69✔
860
        let pipe_stderr = PipeStreamReader::new(
69✔
861
            String::from("stderr"),
69✔
862
            pid,
69✔
863
            pipe_sz_stderr,
69✔
864
            recv_timeout,
69✔
865
            chunk_delimiter_stderr,
69✔
866
            Box::new(process_stderr)
69✔
867
        );
868

869
        let _tidn: u64 = threadid_to_u64(thread::current().id());
69✔
870
        defx!("{_d_p} PyRunner created for Python process PID {}, TID {}", pid, _tidn);
69✔
871

872
        Result::Ok(Self {
69✔
873
            process,
69✔
874
            pipe_stdout,
69✔
875
            pipe_stderr,
69✔
876
            argv,
69✔
877
            python_path: python_path_.clone(),
69✔
878
            exit_status: None,
69✔
879
            pipe_stdout_eof: false,
69✔
880
            pipe_stderr_eof: false,
69✔
881
            pipe_sent_exit: false,
69✔
882
            pipe_sz_stdout,
69✔
883
            pipe_sz_stderr,
69✔
884
            pid_: pid,
69✔
885
            _tidn,
69✔
886
            _d_p,
69✔
887
            stderr_all: None,
69✔
888
            time_beg,
69✔
889
            time_end: None,
69✔
890
            pipe_channel_max_stdout: 0,
69✔
891
            pipe_channel_max_stderr: 0,
69✔
892
            count_proc_reads_stdout: 0,
69✔
893
            count_proc_reads_stderr: 0,
69✔
894
            count_pipe_recv_stdout: 0,
69✔
895
            count_pipe_recv_stderr: 0,
69✔
896
            count_proc_writes: 0,
69✔
897
            count_proc_polls: 0,
69✔
898
            duration_proc_wait: Duration::default(),
69✔
899
            error: None,
69✔
900
        })
69✔
901
    }
69✔
902

903
    #[allow(dead_code)]
904
    pub fn pid(&self) -> u32 {
×
905
        self.pid_
×
906
    }
×
907

908
    /// Returns the process exit status.
909
    /// If the process has not exited yet, returns `None`.
910
    pub fn exit_status(&self) -> Option<ExitStatus> {
10✔
911
        self.exit_status
10✔
912
    }
10✔
913

914
    /// Returns `true` if the process exited successfully.
915
    pub fn exit_okay(&self) -> bool {
155✔
916
        self.exit_status == Some(ExitStatus::default())
155✔
917
    }
155✔
918

919
    /// convert a `RecvError` into an `Error`
920
    fn new_error_from_recverror(&self, recverror: &RecvError) -> Error {
×
921
        Error::new(
×
922
            ErrorKind::Other,
×
923
            format!("Python process {} RecvError: {}",
×
924
                self.pid_, recverror),
925
        )
926
    }
×
927

928
    /// Returns all stderr data accumulated so far.
929
    /// oldest stderr data nearest the front
930
    pub fn stderr_all(&mut self) -> Option<&[u8]> {
×
931
        match &mut self.stderr_all {
×
932
            Some(v) => {
×
933
                v.make_contiguous();
×
934
                Some(v.as_slices().0)
×
935
            },
936
            None => None,
×
937
        }
938
    }
×
939

940
    /// Poll the Python process to see if it has exited.
941
    /// If the process has exited, returns `Some(ExitStatus)`.
942
    /// If the process is still running, returns `None`.
943
    pub fn poll(&mut self) -> Option<ExitStatus> {
9,313✔
944
        let _d_p: &String = &self._d_p;
9,313✔
945
        def1n!("{_d_p} poll()");
9,313✔
946

947
        summary_stat!(self.count_proc_polls += 1);
9,313✔
948

949
        match self.process.try_wait() {
9,313✔
950
            Ok(Some(exit_status)) => {
2,130✔
951
                if self.time_end.is_none() {
2,130✔
952
                    self.time_end = Some(Instant::now());
11✔
953
                    debug_assert_none!(self.exit_status, "exit_status should not be set yet");
11✔
954
                }
2,119✔
955
                // XXX: not sure if the subprocess returned ExitStatus would
956
                //      change on later polls, so only set it once
957
                let mut _was_exited = true;
2,130✔
958
                if self.exit_status.is_none() {
2,130✔
959
                    self.exit_status = Some(exit_status);
11✔
960
                    _was_exited = false;
11✔
961
                }
2,119✔
962
                if exit_status.success() {
2,130✔
963
                    def1x!("{_d_p} exited successfully{}", if _was_exited { " was" } else { "" });
2,120✔
964
                } else if let Some(_code) = exit_status.code() {
10✔
965
                    def1x!("{_d_p} exited with code {}{}", _code, if _was_exited { " was" } else { "" });
10✔
966
                } else {
967
                    def1x!("{_d_p} exited with status {:?}", exit_status);
×
968
                }
969
                self.pipes_exit_sender(ProcessStatus::Exited);
2,130✔
970

971
                Some(exit_status)
2,130✔
972
            },
973
            Ok(None) => {
974
                // Process is still alive
975
                def1x!("{_d_p} is still running");
7,183✔
976

977
                None
7,183✔
978
            },
979
            Err(err) => {
×
980
                def1x!("{_d_p} poll error: {}", err);
×
981
                self.error = Some(err);
×
982
                self.pipes_exit_sender(ProcessStatus::Exited);
×
983

984
                None
×
985
            }
986
        }
987
    }
9,313✔
988

989
    /// Accumulate stderr data up to a maximum number of bytes.
990
    // TODO: [2025/12] isn't there a more rustic way to do this?
991
    //       or a crate that does this?
992
    fn stderr_all_add(&mut self, stderr_data: &Bytes) {
3,764✔
993
        const MAX_STDERR_ALL_BYTES: usize = 1024;
994
        match self.stderr_all.as_mut() {
3,764✔
995
            Some(se_prior) => {
3,737✔
996
                // store as much as possible of prior + new stderr data
997
                if se_prior.len() + stderr_data.len() <= MAX_STDERR_ALL_BYTES {
3,737✔
998
                    se_prior.extend(stderr_data.iter());
1,516✔
999
                } else {
1,516✔
1000
                    // need to drop oldest prior data from the front
1001
                    let mut to_drop: usize = se_prior.len() + stderr_data.len() - MAX_STDERR_ALL_BYTES;
2,221✔
1002
                    while to_drop > 0 {
56,131✔
1003
                        se_prior.pop_front();
53,910✔
1004
                        to_drop -= 1;
53,910✔
1005
                    }
53,910✔
1006
                    // signify the front data has been cut off
1007
                    for b_ in "…".bytes() {
6,663✔
1008
                        se_prior.push_front(b_);
6,663✔
1009
                    }
6,663✔
1010
                    // separate prior data from new data
1011
                    se_prior.push_back(b'\n');
2,221✔
1012
                    se_prior.push_back(b'\n');
2,221✔
1013
                    // append the new data
1014
                    se_prior.extend(stderr_data.iter());
2,221✔
1015
                }
1016
            }
1017
            None => {
1018
                let mut v = VecDeque::<u8>::with_capacity(
27✔
1019
                    if stderr_data.len() > MAX_STDERR_ALL_BYTES {
27✔
1020
                        MAX_STDERR_ALL_BYTES
×
1021
                    } else {
1022
                        stderr_data.len()
27✔
1023
                    }
1024
                );
1025
                v.extend(stderr_data.iter());
27✔
1026
                self.stderr_all = Some(v);
27✔
1027
            }
1028
        }
1029
    }
3,764✔
1030

1031
    /// Send exit message to both stdout and stderr pipe threads.
1032
    /// May be called multiple times but only the first call has effect.
1033
    fn pipes_exit_sender(&mut self, pe: ProcessStatus) {
2,204✔
1034
        if self.pipe_sent_exit {
2,204✔
1035
            return;
2,167✔
1036
        }
37✔
1037
        def2ñ!("{} pipes_exit_sender({:?}) (channels len {}, {})",
37✔
1038
               self._d_p, pe, self.pipe_stdout.exit_sender.len(), self.pipe_stderr.exit_sender.len());
37✔
1039
        self.pipe_stdout.exit_sender.send(pe).unwrap_or(());
37✔
1040
        self.pipe_stderr.exit_sender.send(pe).unwrap_or(());
37✔
1041
        self.pipe_sent_exit = true;
37✔
1042
    }
2,204✔
1043

1044
    /// Write to `input_data` then read from the Python process stdout and
1045
    /// stderr.
1046
    /// Returns (`exited`, `stdout`, `stderr`).
1047
    ///
1048
    /// The stderr_all field accumulates all stderr data read so far. This is to help
1049
    /// when some error occurs in the Python process but the process has not yet exited
1050
    /// and then later calls to `read` find the process has exited. The earlier writes to
1051
    /// stderr are preserved in stderr_all because they often have the crucial error
1052
    /// information e.g. a Python stack trace.
1053
    pub fn write_read(&mut self, input_data: Option<&[u8]>) -> (bool, Option<Bytes>, Option<Bytes>) {
9,313✔
1054
        let _len = input_data.unwrap_or(&[]).len();
9,313✔
1055
        def1n!("{} input_data: {} bytes", self._d_p, _len);
9,313✔
1056

1057
        if let Some(_exit_status) = self.poll() {
9,313✔
1058
            def1o!("{} already exited before read", self._d_p);
2,130✔
1059
        }
7,183✔
1060

1061
        // write string, read from stdout and stderr after poll as there may still be data to read
1062
        // even if the process has exited
1063

1064
        // write to stdin
1065
        if !self.exited() {
9,313✔
1066
            if let Some(input_data_) = input_data {
7,183✔
1067
                if !input_data_.is_empty() {
27✔
1068
                    match self.process.stdin.as_mut() {
27✔
1069
                        Some(stdin) => {
27✔
1070
                            def1o!(
27✔
1071
                                "{} writing {} bytes to stdin (\"{}\")",
1072
                                self._d_p,
1073
                                input_data_.len(),
27✔
1074
                                buffer_to_String_noraw(&input_data_[..input_data_.len().min(10)]).to_string()
27✔
1075
                            );
1076
                            match stdin.write(input_data_) {
27✔
1077
                                Ok(_len) => {
27✔
1078
                                    summary_stat!(self.count_proc_writes += 1);
27✔
1079
                                    def1o!(
27✔
1080
                                        "{} wrote {} bytes to stdin, expected {} bytes",
1081
                                        self._d_p, _len, input_data_.len()
27✔
1082
                                    );
1083
                                }
1084
                                Err(_err) => {
×
1085
                                    de_err!("Error writing to Python process {} stdin: {:?}", self.pid_, _err);
×
1086
                                    self.pipes_exit_sender(ProcessStatus::Exited);
×
1087
                                }
×
1088
                            }
1089
                        }
1090
                        None => {
×
1091
                            de_err!("{} stdin is None", self._d_p);
×
1092
                        }
×
1093
                    }
1094
                } else {
1095
                    def1o!("{} no stdin data to write", self._d_p);
×
1096
                }
1097
            }
7,156✔
1098
        } else {
1099
            def1o!("{} has exited; skip writing to stdin", self._d_p);
2,130✔
1100
        }
1101

1102
        let _d_p: &String = &self._d_p;
9,313✔
1103
        // use select to block until either channel signals data is available
1104
        let mut sel = Select::new();
9,313✔
1105
        let mut _sel_counts: usize = 0;
9,313✔
1106
        let sel_out: usize = if !self.pipe_stdout_eof {
9,313✔
1107
            let id = sel.recv(&self.pipe_stdout.chunk_receiver) + 1; // avoid zero id
9,174✔
1108
            def1o!("{_d_p} select recv(&pipe_stdout.chunk_receiver)");
9,174✔
1109
            _sel_counts += 1;
9,174✔
1110
            id
9,174✔
1111
        } else { 0 };
139✔
1112
        let sel_err: usize = if !self.pipe_stderr_eof {
9,313✔
1113
            let id = sel.recv(&self.pipe_stderr.chunk_receiver) + 1; // avoid zero id
7,312✔
1114
            def1o!("{_d_p} select recv(&pipe_stderr.chunk_receiver)");
7,312✔
1115
            _sel_counts += 1;
7,312✔
1116
            id
7,312✔
1117
        } else { 0 };
2,001✔
1118

1119
        if sel_out == 0 && sel_err == 0 {
9,313✔
1120
            def1o!("{_d_p} both stdout and stderr EOF; return");
88✔
1121
            return (self.exited_exhausted(), None, None);
88✔
1122
        }
9,225✔
1123

1124
        def1o!("{_d_p} wait on {} selects…", _sel_counts);
9,225✔
1125
        let d1: Instant = Instant::now();
9,225✔
1126
        let sel_oper = sel.select();
9,225✔
1127
        summary_stat!(self.duration_proc_wait += d1.elapsed());
9,225✔
1128
        let sel_index: usize = sel_oper.index() + 1; // avoid zero index
9,225✔
1129
        def1o!("{_d_p} selected {}", sel_index);
9,225✔
1130

1131
        // sanity check `*_stream_eof` is not inconsistent with channel readiness
1132
        if cfg!(any(debug_assertions,test)) {
9,225✔
1133
            if sel_index == sel_out && self.pipe_stdout_eof {
9,225✔
1134
                de_wrn!("pipe_stdout_eof should not be false if sel_out is ready");
×
1135
            }
9,225✔
1136
            if sel_index == sel_err && self.pipe_stderr_eof {
9,225✔
1137
                de_wrn!("pipe_stderr_eof should not be false if sel_err is ready");
×
1138
            }
9,225✔
1139
        }
×
1140

1141
        let mut stdout_data: Option<Bytes> = None;
9,225✔
1142
        let mut stderr_data: Option<Bytes> = None;
9,225✔
1143

1144
        // avoid borrow-checker conflicts
1145
        let _d_p = ();
9,225✔
1146

1147
        match sel_index {
9,225✔
1148
            // TODO: combine these matches since they are nearly identical?
1149
            //       though stdout might be treated differently from stderr?
1150
            //       maybe the messages passed back to main thread should distinguish
1151
            //       between stdout and stderr ?
1152
            i if i == sel_out && sel_out != 0 => {
9,225✔
1153
                // read stdout
1154
                summary_stat!(self.pipe_channel_max_stdout =
5,418✔
1155
                    max(
5,418✔
1156
                        self.pipe_channel_max_stdout,
5,418✔
1157
                        self.pipe_stdout.chunk_receiver.len() as Count
5,418✔
1158
                    )
5,418✔
1159
                );
1160
                def1o!("{} recv(&pipe_stdout.chunk_receiver)…", self._d_p);
5,418✔
1161
                summary_stat!(self.count_pipe_recv_stdout += 1);
5,418✔
1162
                match sel_oper.recv(&self.pipe_stdout.chunk_receiver) {
5,418✔
1163
                    Ok(remote_result) => {
5,418✔
1164
                        match remote_result {
5,418✔
1165
                            Ok(piped_line) => {
5,418✔
1166
                                match piped_line {
5,418✔
1167
                                    PipedChunk::Chunk(chunk) => {
5,376✔
1168
                                        let len_ = chunk.len();
5,376✔
1169
                                        def1o!("{} received {} bytes from stdout", self._d_p, len_);
5,376✔
1170
                                        stdout_data = Some(Vec::with_capacity(len_ + 1));
5,376✔
1171
                                        let data = stdout_data.as_mut().unwrap();
5,376✔
1172
                                        data.extend_from_slice(chunk.as_slice());
5,376✔
1173
                                    }
1174
                                    PipedChunk::Continue => {
1175
                                        def1o!("{} stdout Continue", self._d_p);
5✔
1176
                                    }
1177
                                    PipedChunk::Done(reads, remaining_bytes) => {
37✔
1178
                                        summary_stat!(self.count_proc_reads_stdout = reads);
37✔
1179
                                        def1o!("{} stdout Done({} reads, {} remaining bytes)",
37✔
1180
                                               self._d_p, reads, remaining_bytes.len());
37✔
1181
                                        if !remaining_bytes.is_empty() {
37✔
1182
                                            stdout_data = Some(Vec::with_capacity(remaining_bytes.len() + 1));
×
1183
                                            let data = stdout_data.as_mut().unwrap();
×
1184
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1185
                                        }
37✔
1186
                                        self.pipe_stdout_eof = true;
37✔
1187
                                        self.pipes_exit_sender(ProcessStatus::Exited);
37✔
1188
                                    }
1189
                                }
1190
                            }
1191
                            Err(error) => {
×
1192
                                de_err!("Error reading from Python process {} stdout: {:?}", self.pid_, error);
×
1193
                                self.error = Some(error);
×
1194
                                self.pipe_stdout_eof = true;
×
1195
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1196
                            }
×
1197
                        }
1198
                    }
1199
                    Err(recverror) => {
×
1200
                        def1o!("{} stdout channel RecvError {}; set pipe_stdout_eof=true", self._d_p, recverror);
×
1201
                        self.error = Some(self.new_error_from_recverror(&recverror));
×
1202
                        self.pipe_stdout_eof = true;
×
1203
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1204
                    }
1205
                }
1206
            }
1207
            i if i == sel_err && sel_err != 0 => {
3,807✔
1208
                // read stderr
1209
                summary_stat!(self.pipe_channel_max_stderr =
3,807✔
1210
                    max(
3,807✔
1211
                        self.pipe_channel_max_stderr,
3,807✔
1212
                        self.pipe_stderr.chunk_receiver.len() as Count
3,807✔
1213
                    )
3,807✔
1214
                );
1215
                def1o!("{} recv(&pipe_stderr.chunk_receiver)…", self._d_p);
3,807✔
1216
                summary_stat!(self.count_pipe_recv_stderr += 1);
3,807✔
1217
                match sel_oper.recv(&self.pipe_stderr.chunk_receiver) {
3,807✔
1218
                    Ok(remote_result) => {
3,807✔
1219
                        match remote_result {
3,807✔
1220
                            Ok(piped_line) => {
3,807✔
1221
                                match piped_line {
3,807✔
1222
                                    PipedChunk::Chunk(chunk) => {
3,764✔
1223
                                        let len_ = chunk.len();
3,764✔
1224
                                        def1o!("{} received {} bytes from stderr", self._d_p, len_);
3,764✔
1225
                                        let mut data: Bytes = Bytes::with_capacity(len_);
3,764✔
1226
                                        data.extend_from_slice(chunk.as_slice());
3,764✔
1227
                                        self.stderr_all_add(&data);
3,764✔
1228
                                        stderr_data = Some(data);
3,764✔
1229
                                    }
1230
                                    PipedChunk::Continue => {
1231
                                        def1o!("{} stderr Continue", self._d_p);
6✔
1232
                                    }
1233
                                    PipedChunk::Done(reads, remaining_bytes) => {
37✔
1234
                                        summary_stat!(self.count_proc_reads_stderr = reads);
37✔
1235
                                        def1o!("{} stderr Done({} reads, {} remaining bytes)",
37✔
1236
                                               self._d_p, reads, remaining_bytes.len());
37✔
1237
                                        if !remaining_bytes.is_empty() {
37✔
1238
                                            let mut data: Bytes = Bytes::with_capacity(remaining_bytes.len());
×
1239
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1240
                                            self.stderr_all_add(&data);
×
1241
                                            stderr_data = Some(data);
×
1242
                                        }
37✔
1243
                                        self.pipe_stderr_eof = true;
37✔
1244
                                        self.pipes_exit_sender(ProcessStatus::Exited);
37✔
1245
                                    }
1246
                                }
1247
                            }
1248
                            Err(error) => {
×
1249
                                de_err!("Error reading from Python process {} stderr: {:?}", self.pid_, error);
×
1250
                                self.error = Some(error);
×
1251
                                self.pipe_stderr_eof = true;
×
1252
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1253
                            }
×
1254
                        }
1255
                    }
1256
                    Err(_err) => {
×
1257
                        def1o!("{} stderr channel RecvError {}; set pipe_stderr_eof=true", self._d_p, _err);
×
1258
                        self.error = Some(self.new_error_from_recverror(&_err));
×
1259
                        self.pipe_stderr_eof = true;
×
1260
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1261
                    }
1262
                }
1263
            }
1264
            _i => {
×
1265
                def1o!("{} selected unknown index {}", self._d_p, _i);
×
1266
            }
1267
        }
1268

1269
        def1x!("{} return ({}, stdout bytes {:?} (eof? {}), stderr bytes {:?} (eof? {}))",
9,225✔
1270
                self._d_p,
1271
                self.exited_exhausted(),
9,225✔
1272
                stdout_data.as_ref().unwrap_or(&vec![]).len(),
9,225✔
1273
                self.pipe_stdout_eof,
1274
                stderr_data.as_ref().unwrap_or(&vec![]).len(),
9,225✔
1275
                self.pipe_stderr_eof
1276
        );
1277

1278
        (self.exited_exhausted(), stdout_data, stderr_data)
9,225✔
1279
    }
9,313✔
1280

1281
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`?
1282
    pub fn exited(&self) -> bool {
9,359✔
1283
        self.exit_status.is_some()
9,359✔
1284
    }
9,359✔
1285

1286
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`
1287
    /// *and* have both stdout and stderr streams reached EOF?
1288
    pub fn exited_exhausted(&self) -> bool {
18,684✔
1289
        self.exit_status.is_some() && self.pipe_stdout_eof && self.pipe_stderr_eof
18,684✔
1290
    }
18,684✔
1291

1292
    /// Wait for the Python process to exit.
1293
    /// If the process has already exited then return the saved `ExitStatus`.
1294
    pub fn wait(&mut self) -> Result<ExitStatus> {
×
1295
        let _d_p: &String = &self._d_p;
×
1296
        if self.exited() {
×
1297
            def1ñ!("{_d_p} exited; return {:?}",
×
1298
                   self.exit_status.unwrap());
×
1299
            return Ok(self.exit_status.unwrap());
×
1300
        }
×
1301
        def1n!("{_d_p} wait()");
×
1302
        let d1: Instant = Instant::now();
×
1303
        // XXX: should `wait` be passed a timeout?
1304
        let rc = self.process.wait();
×
1305
        summary_stat!(self.duration_proc_wait += d1.elapsed());
×
1306
        match rc {
×
1307
            Ok(exit_status) => {
×
1308
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
×
1309
                self.time_end = Some(Instant::now());
×
1310
                // prefer the first saved `ExitStatus`
1311
                // however setting `self.exit_status` again is never expected to happen
1312
                if self.exit_status.is_none() {
×
1313
                    self.exit_status = Some(exit_status);
×
1314
                } else {
×
1315
                    debug_panic!("Python process {} exit_status is already set! {:?}",
×
1316
                                 self.pid_, self.exit_status)
1317
                }
1318
                def1x!("{_d_p} wait returned {:?}", exit_status);
×
1319
                return Ok(self.exit_status.unwrap());
×
1320
            }
1321
            Err(error) => {
×
1322
                de_err!("{_d_p} wait returned {:?}", error);
×
1323
                def1x!("{_d_p} error wait returned {:?}", error);
×
1324
                return Result::Err(
×
1325
                    Error::new(
×
1326
                        error.kind(),
×
1327
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1328
                    )
×
1329
                );
×
1330
            }
1331
        }
1332
    }
×
1333

1334
    /// Total run duration of the process; imprecise as the end time is merely the first `Instant`
1335
    /// a `subprocess::poll` or `subprocess::wait` returned an `ExitStatus`.
1336
    /// Precise enough for most needs.
1337
    ///
1338
    /// Returns `None` if the process is not yet known to have exited.
1339
    pub fn duration(&self) -> Option<Duration> {
26✔
1340
        self.time_end?;
26✔
1341
        match self.time_end {
26✔
1342
            Some(time_end) => Some(time_end - self.time_beg),
26✔
1343
            None => {
1344
                debug_panic!("Python process {} is exited but time_end is None", self.pid_);
×
1345

1346
                None
×
1347
            }
1348
        }
1349
    }
26✔
1350

1351
    /// Read from the `PyRunner`, print the output, and wait for it to finish.
1352
    /// Do not call `read` or `wait` before calling this function.
1353
    /// Helper for simple Python commands that do not require interaction.
1354
    ///
1355
    /// This is not expected to be run as part of normal operation of
1356
    /// `s4`. This is for special operations such as `s4 --venv`. It prints
1357
    /// to stdout. In normal operation of `s4`, only the main
1358
    /// thread should print to stdout.
1359
    pub fn run(&mut self, print_argv: bool, print_stdout: bool, print_stderr: bool) -> Result<(Bytes, Bytes)> {
26✔
1360
        let _d_p: &String = &self._d_p;
26✔
1361
        def1n!("{_d_p}, print_argv={}", print_argv);
26✔
1362

1363
        if self.exited() {
26✔
1364
            debug_panic!("{_d_p} already exited!");
×
1365
            return Result::Ok((Bytes::with_capacity(0), Bytes::with_capacity(0)));
×
1366
        }
26✔
1367

1368
        if print_argv {
26✔
1369
            // print the command executed
1370

1371
            // get the prompt, prefer to use PS4 env var
1372
            let prompt = match env::var("PS4") {
10✔
1373
                Ok(s) => {
×
1374
                    if s.is_empty() {
×
1375
                        String::from(PROMPT_DEFAULT)
×
1376
                    } else {
1377
                        s
×
1378
                    }
1379
                },
1380
                Err(_) => String::from(PROMPT_DEFAULT),
10✔
1381
            };
1382
            // print the command, escaping each argument
1383
            let mut lock = stdout().lock();
10✔
1384
            // TODO: [2025/11/17] handle write returning an error?
1385
            let _ = lock.write(prompt.as_bytes());
10✔
1386
            for arg in self.argv.iter() {
51✔
1387
                let es = escape(arg.into());
51✔
1388
                let _ = lock.write(es.as_bytes());
51✔
1389
                let _ = lock.write(b" ");
51✔
1390
            }
51✔
1391
            let _ = lock.write(b"\n");
10✔
1392
            let _ = lock.flush();
10✔
1393
        }
16✔
1394

1395
        let rc = self.process.wait();
26✔
1396
        def1o!("{_d_p} wait() returned {:?}", rc);
26✔
1397
        match rc {
26✔
1398
            Ok(exit_status) => {
26✔
1399
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
26✔
1400
                self.time_end = Some(Instant::now());
26✔
1401
                // prefer the first saved `ExitStatus`
1402
                // however setting `self.exit_status` again is never expected to happen
1403
                if self.exit_status.is_none() {
26✔
1404
                    self.exit_status = Some(exit_status);
26✔
1405
                } else {
26✔
1406
                    debug_panic!("{_d_p} exit_status is already set! {:?}",
×
1407
                                 self.exit_status)
1408
                }
1409
            }
1410
            Err(error) => {
×
1411
                de_err!("{_d_p} wait returned {:?}", error);
×
1412
                def1x!("{_d_p} error wait returned {:?}", error);
×
1413
                return Result::Err(
×
1414
                    Error::new(
×
1415
                        error.kind(),
×
1416
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1417
                    )
×
1418
                );
×
1419
            }
1420
        }
1421

1422
        let mut stdout_data: Bytes = Bytes::with_capacity(2056);
26✔
1423
        let mut stderr_data: Bytes = Bytes::with_capacity(1024);
26✔
1424

1425
        // remove _d_p reference to avoid borrow checker conflict that would occur in the loop
1426
        let _d_p = ();
26✔
1427

1428
        // print remaining stdout and stderr
1429
        loop {
1430
            let (
1431
                _exited,
2,008✔
1432
                out_data,
2,008✔
1433
                err_data,
2,008✔
1434
            ) = self.write_read(None);
2,008✔
1435
            def1o!("{} exited? {:?}", self._d_p, _exited);
2,008✔
1436
            // print stdout to stdout
1437
            if let Some(data) = out_data {
2,008✔
1438
                stdout_data.extend_from_slice(data.as_slice());
1,903✔
1439
                if ! data.is_empty() && print_stdout {
1,903✔
1440
                    let mut lock = stdout().lock();
1,879✔
1441
                    let _ = lock.write(data.as_slice());
1,879✔
1442
                    let _ = lock.flush();
1,879✔
1443
                }
1,879✔
1444
            }
105✔
1445
            // print stderr to stderr
1446
            if let Some(data) = err_data {
2,008✔
1447
                stderr_data.extend_from_slice(data.as_slice());
53✔
1448
                if ! data.is_empty() && print_stderr {
53✔
1449
                    let mut lock = stderr().lock();
27✔
1450
                    let _ = lock.write(data.as_slice());
27✔
1451
                    let _ = lock.flush();
27✔
1452
                }
27✔
1453
            }
1,955✔
1454
            if _exited {
2,008✔
1455
                break;
26✔
1456
            }
1,982✔
1457
        }
1458

1459
        let _d_p: &String = &self._d_p;
26✔
1460

1461
        match self.exit_status {
26✔
1462
            Some(status) => {
26✔
1463
                if ! status.success() {
26✔
1464
                    let s = format!("Python process {} exited with non-zero status {:?}", self.pid_, status);
×
1465
                    def1x!("{_d_p} {}", s);
×
1466
                    return Result::Err(
×
1467
                        Error::other(s)
×
1468
                    )
×
1469
                }
26✔
1470
            }
1471
            None => {
1472
                debug_panic!("{_d_p} exit_status is None after wait()");
×
1473
            }
1474
        }
1475

1476
        if print_argv {
26✔
1477
            let mut lock = stdout().lock();
10✔
1478
            let _ = lock.write(b"\n");
10✔
1479
            let _ = lock.flush();
10✔
1480
        }
16✔
1481

1482
        def1x!("{_d_p} duration {:?}, return Ok(stdout {} bytes, stderr {} bytes)",
26✔
1483
            self.duration(), stdout_data.len(), stderr_data.len());
26✔
1484

1485
        Result::Ok((stdout_data, stderr_data))
26✔
1486
    }
26✔
1487

1488
    /// Create a `PyRunner`, run it, return Ok or Err.
1489
    ///
1490
    /// This calls `PyRunner::new()` and then `PyRunner::run()`.
1491
    /// See `run()` regarding its intended use.
1492
    pub fn run_once(
17✔
1493
        python_to_use: PythonToUse,
17✔
1494
        pipe_sz: PipeSz,
17✔
1495
        recv_timeout: Duration,
17✔
1496
        chunk_delimiter: ChunkDelimiter,
17✔
1497
        python_path: Option<FPath>,
17✔
1498
        argv: Vec<&str>,
17✔
1499
        print_argv: bool
17✔
1500
    ) -> Result<(PyRunner, Bytes, Bytes)> {
17✔
1501
        def1ñ!("({:?}, {:?}, {:?})", python_to_use, python_path, argv);
17✔
1502
        let mut pyrunner = match PyRunner::new(
17✔
1503
            python_to_use,
17✔
1504
            pipe_sz,
17✔
1505
            recv_timeout,
17✔
1506
            Some(chunk_delimiter),
17✔
1507
            None,
17✔
1508
            python_path,
17✔
1509
            argv,
17✔
1510
        ) {
17✔
1511
            Ok(pyrunner) => pyrunner,
17✔
1512
            Err(err) => {
×
1513
                def1x!("PyRunner::new failed {:?}", err);
×
1514
                return Result::Err(err);
×
1515
            }
1516
        };
1517

1518
        match pyrunner.run(print_argv, true, true) {
17✔
1519
            Ok((stdout_data, stderr_data)) => {
17✔
1520
                def1x!("PyRunner::run Ok");
17✔
1521

1522
                Result::Ok((pyrunner, stdout_data, stderr_data))
17✔
1523
            }
1524
            Err(err) => {
×
1525
                def1x!("PyRunner::run Error {:?}", err);
×
1526

1527
                Result::Err(err)
×
1528
            }
1529
        }
1530
    }
17✔
1531
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc