• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jtmoon79 / super-speedy-syslog-searcher / 20704884474

05 Jan 2026 04:14AM UTC coverage: 67.869% (-0.1%) from 67.988%
20704884474

push

github

jtmoon79
(TEST) fix test_PyEventReader_new_asl_odl_panic

15705 of 23140 relevant lines covered (67.87%)

118323.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.96
/src/python/pyrunner.rs
1
// src/python/pyrunner.rs
2

3
//! Runs a Python process instance. It communicates with the Python process
4
//! over threaded `PipeStreamReader`s connected to stdout, stderr, and stdin.
5
//! It uses `std::process::Child` to start and manage the Python process.
6

7
use std::cmp::{
8
    max,
9
    min,
10
};
11
use std::collections::{
12
    HashSet,
13
    VecDeque,
14
};
15
use std::env;
16
use std::io::{
17
    Error,
18
    ErrorKind,
19
    Read,
20
    Result,
21
    Write,
22
    stderr,
23
    stdout,
24
};
25
use std::path::PathBuf;
26
use std::process::{
27
    Child,
28
    Command,
29
    Stdio,
30
};
31
use std::sync::RwLock;
32
use std::thread;
33
use std::time::{
34
    Duration,
35
    Instant,
36
};
37

38
use ::crossbeam_channel::{
39
    Sender,
40
    Receiver,
41
    RecvError,
42
    RecvTimeoutError,
43
    Select,
44
};
45
use ::lazy_static::lazy_static;
46
use ::memchr::memmem::Finder as memchr_Finder;
47
use ::once_cell::sync::OnceCell;
48
use ::pathsearch::find_executable_in_path;
49
use ::shell_escape::escape;
50
#[allow(unused_imports)]
51
use ::si_trace_print::{
52
    defñ,
53
    defn,
54
    defo,
55
    defx,
56
    def1ñ,
57
    def1n,
58
    def1o,
59
    def1x,
60
    def2ñ,
61
    def2n,
62
    def2o,
63
    def2x,
64
    e,
65
    ef1n,
66
    ef1o,
67
    ef1x,
68
    ef1ñ,
69
    ef2n,
70
    ef2o,
71
    ef2x,
72
    ef2ñ,
73
};
74

75
use crate::{
76
    de_err,
77
    de_wrn,
78
    debug_assert_none,
79
    debug_panic,
80
};
81
use crate::common::{
82
    Bytes,
83
    Count,
84
    FPath,
85
    threadid_to_u64,
86
    summary_stat,
87
};
88
#[cfg(any(debug_assertions, test))]
89
use crate::debug::printers::buffer_to_String_noraw;
90
use crate::readers::helpers::path_to_fpath;
91
use crate::python::venv::venv_path;
92

93
/// Python process exit result
94
pub type ExitStatus = std::process::ExitStatus;
95

96
/// Size of pipe read/write buffers in bytes
97
pub type PipeSz = usize;
98

99
/// Delimiter byte used to separate chunks of data read from the Python process
100
pub type ChunkDelimiter = u8;
101

102
/// Names of possible Python executables that could be found in path
103
pub const PYTHON_NAMES: [&str; 13] = [
104
    "python3",
105
    "python",
106
    "python3.exe",
107
    "python.exe",
108
    "python37",
109
    "python38",
110
    "python39",
111
    "python310",
112
    "python311",
113
    "python312",
114
    "python313",
115
    "pypy3",
116
    "pypy",
117
];
118
/// Possible subdirectories within a Python installation where the Python
119
/// interpreter executable may be found
120
pub const PYTHON_SUBDIRS: [&str; 3] = [
121
    "bin",
122
    "Scripts",
123
    "",
124
];
125

126
pub const PROMPT_DEFAULT: &str = "$ ";
127

128
pub const CHANNEL_CAPACITY: usize = 16;
129

130
/// Environment variable that refers to the exact path to a Python interpreter
131
/// executable
132
pub const PYTHON_ENV: &str = "S4_PYTHON";
133

134
/// default timeout for Pipe `recv_timeout` when reading from the child Python processes
135
pub const RECV_TIMEOUT: Duration = Duration::from_millis(5);
136

137
/// cached Python path found in environment variable `S4_PYTHON`.
138
/// set in `find_python_executable`
139
#[allow(non_upper_case_globals)]
140
pub static PythonPathEnv: OnceCell<Option<FPath>> = OnceCell::new();
141
/// cached Python path found in path, set in `find_python_executable`
142
#[allow(non_upper_case_globals)]
143
pub static PythonPathPath: OnceCell<Option<FPath>> = OnceCell::new();
144
/// cached Python path in s4 venv, set in `find_python_executable`
145
#[allow(non_upper_case_globals)]
146
pub static PythonPathVenv: OnceCell<Option<FPath>> = OnceCell::new();
147

148
lazy_static! {
149
    /// Summary statistic.
150
    /// Record which Python interpreters ran.
151
    /// only intended for summary printing
152
    pub static ref PythonPathsRan: RwLock<HashSet<FPath>> = {
153
        defñ!("init PythonPathsRan");
154

155
        RwLock::new(HashSet::<FPath>::new())
156
    };
157
}
158

159
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
160
pub enum PythonToUse {
161
    /// only use Python referred to by environment variable `S4_PYTHON`
162
    Env,
163
    /// only use Python found in the `PATH`
164
    Path,
165
    /// use Python referred to by environment variable `S4_PYTHON` if set
166
    /// but if not set then use Python found in the `PATH`
167
    EnvPath,
168
    /// only use Python in predetermined s4 .venv
169
    Venv,
170
    /// use Python referred to by environment variable `S4_PYTHON` if set
171
    /// but if not set then use Python in predetermined s4 .venv
172
    EnvVenv,
173
    /// use a passed value
174
    Value,
175
}
176

177
/// find a Python executable.
178
/// `python_to_use` instructs how to find the Python executable.
179
/// Does not check if the found Python executable is valid.
180
/// Caches found paths for reliability among threads.
181
/// Returns `None` when passed `PythonToUse::Value`.
182
pub fn find_python_executable(python_to_use: PythonToUse) -> &'static Option<FPath> {
85✔
183
    defn!("{:?}", python_to_use);
85✔
184

185
    match python_to_use {
85✔
186
        PythonToUse::Env => {
187
            let ret: &Option<FPath> = PythonPathEnv.get_or_init(||
12✔
188
                // check process environment variable
189
                match env::var(PYTHON_ENV) {
1✔
190
                    Ok(val) => {
×
191
                        defo!("env::var found {}={:?}", PYTHON_ENV, val);
×
192
                        if ! val.is_empty() {
×
193
                            Some(val)
×
194
                        } else {
195
                            None
×
196
                        }
197
                    }
198
                    Err (_err) => {
1✔
199
                        defo!("env::var did not find {:?}; {:?}", PYTHON_ENV, _err);
1✔
200
                        None
1✔
201
                    }
202
                }
1✔
203
            );
204
            defx!("{:?}, return {:?}", python_to_use, ret);
12✔
205

206
            ret
12✔
207
        }
208
        PythonToUse::Path => {
209
            let ret: &Option<FPath> = PythonPathPath.get_or_init(||{
42✔
210
                let mut python_path: Option<PathBuf> = None;
1✔
211
                // check PATH for python executable
212
                for name in PYTHON_NAMES.iter() {
1✔
213
                    defo!("find_executable_in_path({:?})", name);
1✔
214
                    if let Some(p) = find_executable_in_path(name) {
1✔
215
                        defo!("find_executable_in_path returned {:?}", p);
1✔
216
                        python_path = Some(p);
1✔
217
                        break;
1✔
218
                    };
×
219
                }
220
                if let Some(p) = python_path {
1✔
221
                    Some(path_to_fpath(p.as_path()))
1✔
222
                } else {
223
                    None
×
224
                }
225
            });
1✔
226
            defx!("{:?}, return {:?}", python_to_use, ret);
42✔
227

228
            ret
42✔
229
        }
230
        PythonToUse::EnvPath => {
231
            // try Env then try Path
232
            let p = find_python_executable(PythonToUse::Env);
1✔
233
            if p.is_some() {
1✔
234
                defx!("{:?}, return {:?}", python_to_use, p);
×
235
                return p;
×
236
            }
1✔
237
            let p = find_python_executable(PythonToUse::Path);
1✔
238
            defx!("{:?}, return {:?}", python_to_use, p);
1✔
239

240
            p
1✔
241
        }
242
        PythonToUse::Venv => {
243
            let ret: &Option<FPath> = PythonPathVenv.get_or_init(||{
19✔
244
                // get the venv path
245
                let venv: PathBuf = venv_path();
1✔
246
                defo!("venv={:?}", venv);
1✔
247
                // look for common subdirectories of Python virtual environments where the
248
                // Python executable may be found
249
                // XXX: we could try to do this by platform
250
                //      i.e. on Windows only look in "Scripts", etc.
251
                //      but this is fine
252
                for dir in PYTHON_SUBDIRS.iter() {
1✔
253
                    let mut venv_dir = venv.clone();
1✔
254
                    if ! dir.is_empty() {
1✔
255
                        venv_dir.push(dir);
1✔
256
                    }
1✔
257
                    for name in PYTHON_NAMES.iter() {
1✔
258
                        let mut venv_name = venv_dir.clone();
1✔
259
                        venv_name.push(name);
1✔
260
                        defo!("venv_name.exists?={:?}", venv_name);
1✔
261
                        if venv_name.exists() {
1✔
262
                            let fp = path_to_fpath(venv_name.as_path());
1✔
263
                            defo!("found venv python executable: {:?}", fp);
1✔
264
                            return Some(fp);
1✔
265
                        }
×
266
                    }
267
                }
268
                None
×
269
            });
1✔
270
            defx!("{:?}, return {:?}", python_to_use, ret);
19✔
271

272
            ret
19✔
273
        }
274
        PythonToUse::EnvVenv => {
275
            // try Env then try Venv
276
            let p = find_python_executable(PythonToUse::Env);
11✔
277
            if p.is_some() {
11✔
278
                defx!("{:?}, return {:?}", python_to_use, p);
×
279
                return p;
×
280
            }
11✔
281
            let p = find_python_executable(PythonToUse::Venv);
11✔
282
            defx!("{:?}, return {:?}", python_to_use, p);
11✔
283

284
            p
11✔
285
        }
286
        PythonToUse::Value => {
287
            debug_panic!("PythonToUse::Value should not be used in find_python_executable");
×
288

289
            &None
×
290
        }
291
    }
292
}
85✔
293

294
#[derive(Debug)]
295
enum PipedChunk {
296
    /// a chunk of bytes read from the child process
297
    Chunk(Bytes),
298
    /// process not sending but still running
299
    Continue,
300
    /// process exited or no more data to read
301
    /// contains number of reads performed and remaining bytes
302
    Done(u64, Bytes),
303
}
304

305
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
306
enum ProcessStatus {
307
    #[default]
308
    Running,
309
    Exited,
310
}
311

312
/// Reads data from the pipe and returns chunks of data to the caller.
313
/// Churks are delimited by the passed `chunk_delimiter_opt` if set.
314
/// If `chunk_delimiter_opt` is `None` then each read immediately returns any data read.
315
///
316
/// Inspired by gist [ArtemGr/db40ae04b431a95f2b78](https://gist.github.com/ArtemGr/db40ae04b431a95f2b78).
317
struct PipeStreamReader {
318
    chunk_receiver: Receiver<core::result::Result<PipedChunk, Error>>,
319
    exit_sender: Sender<ProcessStatus>,
320
}
321

322
impl PipeStreamReader {
323
    /// Starts a thread reading bytes from a child process pipe.
324
    ///
325
    /// `pipe_sz` is the size of the Pipe chunk buffer in bytes.
326
    ///
327
    /// `recv_timeout` is the timeout duration for calls to `recv_timeout()`.
328
    ///
329
    /// `chunk_delimiter_opt` is an optional byte delimiter used to separate chunks of data.
330
    /// If `None` then each read immediately returns any data read.
331
    ///
332
    /// `stream_child_proc` is the `Read` stream of the child process to read from.
333
    ///
334
    /// `name` and `pid` are used for debugging messages.
335
    /// `name` is the name of the pipe.
336
    /// `pid` is the process ID of the child process.
337
    fn new(
138✔
338
        name: String,
138✔
339
        pid: u32,
138✔
340
        pipe_sz: PipeSz,
138✔
341
        recv_timeout: Duration,
138✔
342
        chunk_delimiter_opt: Option<ChunkDelimiter>,
138✔
343
        mut stream_child_proc: Box<dyn Read + Send>
138✔
344
    ) -> PipeStreamReader
138✔
345
    {
346
        def1n!("PipeStreamReader new(pipe_sz={}, name={:?}, chunk_delimiter_opt={:?})",
138✔
347
               pipe_sz, name, chunk_delimiter_opt);
348
        def1o!("PipeStreamReader {:?} create bounded({}) channel", name, CHANNEL_CAPACITY);
138✔
349
        let (tx_exit, rx_exit) =
138✔
350
            ::crossbeam_channel::bounded(CHANNEL_CAPACITY);
138✔
351

352
        PipeStreamReader {
353
            chunk_receiver: {
354
                let thread_name: String = format!("{}_PipeStreamReader", name);
138✔
355
                let _thread_name2: String = thread_name.clone();
138✔
356
                // parent thread ID
357
                let _tidn_p: u64 = threadid_to_u64(thread::current().id());
138✔
358
                // debug message prepend
359
                let _d_p = format!(
138✔
360
                    "PipeStreamReader {:?} PID {:?} PTID {:?}",
138✔
361
                    name, pid, _tidn_p
362
                );
363
                def1o!("{_d_p} create unbounded() channel");
138✔
364
                let (tx_parent, rx_parent) =
138✔
365
                    ::crossbeam_channel::unbounded();
138✔
366

367
                let thread_pipe = thread::Builder::new().name(thread_name.clone());
138✔
368

369
                def1o!("{_d_p} spawn thread {:?}", thread_name);
138✔
370
                let result = thread_pipe.spawn(move ||
138✔
371
                {
138✔
372
                    // debug message prepend
373
                    let _d_p = format!(
138✔
374
                        "PipeStreamReader {:?} PID {:?} PTID {:?} TID {:?}",
138✔
375
                        name, pid, _tidn_p, threadid_to_u64(thread::current().id()));
138✔
376
                    def2n!("{_d_p} start, pipe_sz {}", pipe_sz);
138✔
377
                    let mut _recv_bytes: usize = 0;
138✔
378
                    let mut reads: usize = 0;
138✔
379
                    let mut _sends: usize = 0;
138✔
380
                    let mut delim_found: bool = false;
138✔
381
                    let mut buf = Bytes::with_capacity(pipe_sz);
138✔
382
                    let buf_chunk1_sz: usize = match chunk_delimiter_opt {
138✔
383
                        Some(_delim) => pipe_sz,
79✔
384
                        None => 0, // `buf_chunk1` not used
59✔
385
                    };
386
                    let mut buf_chunk1: Bytes = Bytes::with_capacity(buf_chunk1_sz);
138✔
387
                    //let mut buf_chunk2: Bytes = Bytes::with_capacity(pipe_sz);
388
                    loop {
389
                        reads += 1;
81,111✔
390
                        buf.clear();
81,111✔
391
                        buf.resize(pipe_sz, 0);
81,111✔
392

393
                        def2o!("{_d_p} stream_child_proc.read(buf capacity {}, len {})…", buf.capacity(), buf.len());
81,111✔
394
                        /*
395
                        From the docs regarding read():
396

397
                            This function does not provide any guarantees about whether it blocks
398
                            waiting for data, but if an object needs to block for a read and cannot,
399
                            it will typically signal this via an Err return value.
400

401
                        See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
402
                        */
403
                        match stream_child_proc.read(&mut buf) {
81,111✔
404
                            Ok(0) => {
405
                                def2o!("{_d_p} read zero bytes of {} total", _recv_bytes);
143✔
406
                                /*
407
                                From the docs regarding read() returning Ok(0):
408

409
                                    This reader has reached its "end of file" and will likely no
410
                                    longer be able to produce bytes. Note that this does not mean
411
                                    that the reader will always no longer be able to produce bytes.
412
                                    As an example, on Linux, this method will call the recv syscall
413
                                    for a [TcpStream], where returning zero indicates the connection
414
                                    was shut down correctly. While for [File], it is possible to
415
                                    reach the end of file and get zero as result, but if more data
416
                                    is appended to the file, future calls to read will return more
417
                                    data.
418

419
                                See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
420
                                */
421
                                if delim_found {
143✔
422
                                    delim_found = false;
53✔
423
                                }
90✔
424
                                // XXX: if the child python process has exited then this may become
425
                                //      a busy loop until the parent thread notices the python
426
                                //      process has exited and then can send a
427
                                //      `ProcessStatus::Exited`.
428
                                //      Using `recv_timeout(5ms)` softens this busy loop.
429
                                //      It's ugly but it works.
430
                                def2o!("{_d_p} rx_exit.recv_timeout({:?}) (len {})…", recv_timeout, rx_exit.len());
143✔
431
                                let rx_result = rx_exit.recv_timeout(recv_timeout);
143✔
432
                                match rx_result {
88✔
433
                                    Ok(ProcessStatus::Exited) => {
434
                                        def2o!("{_d_p} rx_exit ProcessStatus::Exited; send Done({}, buf_chunk1 {} bytes) and break",
88✔
435
                                               reads, buf_chunk1.len());
88✔
436
                                        _sends += 1;
88✔
437
                                        def2o!("{_d_p} tx_parent.send(Ok(PipedChunk::Done({}, buf_chunk1 {} bytes))) (channel len {})…",
88✔
438
                                               reads, buf_chunk1.len(), tx_parent.len());
88✔
439
                                        match tx_parent.send(Ok(PipedChunk::Done(reads as u64, buf_chunk1))) {
88✔
440
                                            Ok(_) => {}
85✔
441
                                            Err(_err) => {
3✔
442
                                                def2o!("{_d_p} tx send error: {:?}", _err);
3✔
443
                                            }
444
                                        }
445
                                        break;
88✔
446
                                    }
447
                                    Ok(ProcessStatus::Running) => {
448
                                        def2o!("{_d_p} rx_exit ProcessStatus::Running; continue reading");
×
449
                                    }
450
                                    Err(RecvTimeoutError::Timeout) => {
451
                                        def2o!("{_d_p} RecvTimeoutError::Timeout; continue reading");
17✔
452
                                    }
453
                                    Err(RecvTimeoutError::Disconnected) => {
454
                                        def2o!("{_d_p} RecvTimeoutError::Disconnected; break");
38✔
455
                                        break;
38✔
456
                                    }
457
                                }
458
                                // send Continue if no more messages to process by parent thread
459
                                if tx_parent.is_empty() {
17✔
460
                                    def2o!("{_d_p} tx_parent.send(Ok(PipedChunk::Continue))…");
17✔
461
                                    match tx_parent.send(Ok(PipedChunk::Continue)) {
17✔
462
                                        Ok(_) => {
17✔
463
                                            _sends += 1;
17✔
464
                                        }
17✔
465
                                        Err(_err) => {
×
466
                                            def2o!("{_d_p} tx send error: {:?}", _err);
×
467
                                            de_err!("{_d_p} tx send error: {:?}", _err);
×
468
                                        }
469
                                    };
470
                                }
×
471
                            }
472
                            Ok(len) => {
80,968✔
473
                                _recv_bytes += len;
80,968✔
474
                                def2o!("{_d_p} (read #{}) read {} bytes of {} total in this pipe", reads, len, _recv_bytes);
80,968✔
475
                                // is there a chunk delimiter in the buffer?
476

477
                                match chunk_delimiter_opt {
80,968✔
478
                                    Some(chunk_delimiter) => {
77,820✔
479
                                        // look for delimiter
480
                                        // TODO: [2025/12] add benchmark to compare different methods
481
                                        //       of finding a delimiter.
482
                                        //       This `find_memchr` creates a new `Finder<'_>`
483
                                        //       which may not be worth the trouble.
484
                                        let needle = &[chunk_delimiter];
77,820✔
485
                                        let finder = memchr_Finder::new(needle);
77,820✔
486
                                        let mut at: usize = 0;
77,820✔
487
                                        let mut _loop: usize = 0;
77,820✔
488
                                        while at < len {
158,091✔
489
                                            _loop += 1;
82,370✔
490
                                            def2o!("{_d_p} (read #{reads} loop {_loop}) searching for delimiter in buf[{at}..{len}] '{}'", 
82,370✔
491
                                                buffer_to_String_noraw(&buf[at..len]));
82,370✔
492
                                            match finder.find(&buf[at..len]) {
82,370✔
493
                                                Some(pos) => {
8,200✔
494
                                                    // delimiter found at pos
495
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) found delimiter at pos {} (absolute pos {}) among {} returned bytes; buf len {}, buf capacity {}",
8,200✔
496
                                                        pos, at + pos, len, buf.len(), buf.capacity());
8,200✔
497
                                                    debug_assert!(at + pos < buf.len(), "at {} + pos {} >= buf.len {}", at, pos, buf.len());
8,200✔
498
                                                    // send chunks, keep the remainder
499
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(buf[{}..{}])", at, at + pos + 1);
8,200✔
500
                                                    buf_chunk1.extend_from_slice(&buf[at..at + pos + 1]);
8,200✔
501
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: '{}'", buffer_to_String_noraw(&buf_chunk1));
8,200✔
502
                                                    let blen = buf_chunk1.len();
8,200✔
503
                                                    let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
8,200✔
504
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send.extend_from_slice(&buf_chunk1 len {}) (chunk_send capacity {})",
8,200✔
505
                                                        buf_chunk1.len(), chunk_send.capacity());
8,200✔
506
                                                    chunk_send.extend_from_slice(&buf_chunk1);
8,200✔
507
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send: '{}' (channel len {})",
8,200✔
508
                                                           buffer_to_String_noraw(&chunk_send), tx_parent.len());
8,200✔
509
                                                    let data_send = PipedChunk::Chunk(chunk_send);
8,200✔
510
                                                    _sends += 1;
8,200✔
511
                                                    match tx_parent.send(Ok(data_send)) {
8,200✔
512
                                                        Ok(_) => {
513
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) sent chunk_send {} bytes, send #{_sends}", blen);
6,101✔
514
                                                        }
515
                                                        Err(_err) => {
2,099✔
516
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) send error: {:?}", _err);
2,099✔
517
                                                            break;
2,099✔
518
                                                        }
519
                                                    }
520
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.clear()");
6,101✔
521
                                                    buf_chunk1.clear();
6,101✔
522
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(&buf[{}..{}]) (buf len {}, buf capacity {})",
523
                                                    //     at + pos + 1, len, buf.len(), buf.capacity());
524
                                                    // buf_chunk1.extend_from_slice(&buf[at + pos + 1..len]);
525
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
526
                                                    //     buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
527
                                                    delim_found = true;
6,101✔
528
                                                    at += pos + 1;
6,101✔
529
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) {} bytes remaining in buf", len - at);
6,101✔
530
                                                }
531
                                                None => {
532
                                                    // delimiter not found, save buffer and then read child process again
533
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) no delimiter; buf_chunk1.extend_from_slice(&buf[{}..{}]) '{}'",
74,170✔
534
                                                        at, len, buffer_to_String_noraw(&buf[at..len]));
74,170✔
535
                                                    buf_chunk1.extend_from_slice(&buf[at..len]);
74,170✔
536
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
74,170✔
537
                                                        buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
74,170✔
538
                                                    delim_found = false;
74,170✔
539
                                                    at += len + 1;
74,170✔
540
                                                }
541
                                            }
542
                                        }
543
                                    }
544
                                    None => {
545
                                        // no delimiter configured, send entire buffer as a chunk
546
                                        let slice_ = &buf[..len];
3,148✔
547
                                        let blen = slice_.len();
3,148✔
548
                                        let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
3,148✔
549
                                        chunk_send.extend_from_slice(slice_);
3,148✔
550
                                        let data_send = PipedChunk::Chunk(chunk_send);
3,148✔
551
                                        delim_found = false;
3,148✔
552
                                        def2o!("{_d_p} (read #{reads}) read {} bytes of {} total; no delimiter configured, send Chunk {} bytes",
3,148✔
553
                                            len, _recv_bytes, blen);
554
                                        _sends += 1;
3,148✔
555
                                        match tx_parent.send(Ok(data_send)) {
3,148✔
556
                                            Ok(_) => {
557
                                                def2o!("{_d_p} (read #{reads}) sent chunk_send {} bytes, send #{_sends}", blen);
3,136✔
558
                                            }
559
                                            Err(_err) => {
12✔
560
                                                def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
12✔
561
                                                break;
12✔
562
                                            }
563
                                        }
564
                                    }
565
                                }
566
                            }
567
                            Err(error) => {
×
568
                                if error.kind() == ErrorKind::Interrupted {
×
569
                                    def2o!("{_d_p} (read #{reads}) read interrupted; retry");
×
570
                                    continue;
×
571
                                }
×
572
                                def2o!("{_d_p} (read #{reads}) read error {}; send Error", error);
×
573
                                delim_found = false;
×
574
                                _sends += 1;
×
575
                                match tx_parent.send(Err(error)) {
×
576
                                    Ok(_) => {}
×
577
                                    Err(_err) => {
×
578
                                        def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
×
579
                                        break;
×
580
                                    }
581
                                }
582
                            }
583
                        }
584
                    }
585
                    def2x!(
138✔
586
                        "{_d_p} exit, received {} bytes, child process reads {}, parent thread sends {}",
138✔
587
                        _recv_bytes, reads, _sends
588
                    );
589
                });
138✔
590
                match result {
138✔
591
                    Ok(_handle) => {
138✔
592
                        def1o!("{_d_p} spawned thread {:?}", _thread_name2);
138✔
593
                    }
594
                    Err(_err) => {
×
595
                        def1o!("{_d_p} thread spawn error: {}", _err);
×
596
                    }
597
                }
598
                def1x!("{_d_p} return Receiver");
138✔
599

600
                rx_parent
138✔
601
            },
602
            exit_sender: tx_exit,
138✔
603
        }
604
    }
138✔
605
}
606

607
/// `PyRunner` is a struct that represents a Python process instance. It hides
608
/// the complications of starting and communicating with a Python process
609
/// over pipes. It uses `std::process::Child` to start and manage the Python
610
/// process. This handles the complexity of asynchronous inter-process
611
/// communication which is non-trivial to implement decently.
612
///
613
/// _XXX:_ ideally, this would use `PyO3` to communicate with a Python interpreter
614
/// instance. However, [`PyO3::Python::attach`] only creates one
615
/// Python process per Rust process.
616
/// And `PyO3` does not provide a way to create Python subprocesses. So all
617
/// Rust process threads that would use `PyO3` are bottlenecked by this
618
/// one Python process which is of course, in effect, a single-threaded process.
619
/// See [PyO3 Issue #576].
620
/// So instead each `PyRunner` instance creates a new Python process using
621
/// [`std::process::Child`] and communicates over stdout, stderr, and stdin pipes.
622
///
623
/// _XXX:_ I also tried using crate `subprocess` to manage the Python process. However,
624
/// it was not able to handle irregular asynchronous communication.
625
///
626
/// [`PyO3::Python::attach`]: https://docs.rs/pyo3/0.27.1/pyo3/marker/struct.Python.html#method.attach
627
/// [PyO3 Issue #576]: https://github.com/PyO3/pyo3/issues/576
628
pub struct PyRunner {
629
    /// handle to the Python process
630
    //pub process: subprocess::Popen,
631
    pub process: Child,
632
    pipe_stdout: PipeStreamReader,
633
    pipe_stderr: PipeStreamReader,
634
    /// arguments of the process
635
    argv: Vec<String>,
636
    /// path to Python exectuable
637
    pub python_path: FPath,
638
    /// save the `ExitStatus`
639
    exit_status: Option<ExitStatus>,
640
    pipe_stdout_eof: bool,
641
    pipe_stderr_eof: bool,
642
    /// protect against sending repeat exit messages to child pipe threads.
643
    /// only used in `pipes_exit_sender()`
644
    pipe_sent_exit: bool,
645
    /// pipe buffer size in bytes for stdout `PipeStreamReader`
646
    pub pipe_sz_stdout: PipeSz,
647
    /// pipe buffer size in bytes for stderr `PipeStreamReader`
648
    pub pipe_sz_stderr: PipeSz,
649
    /// `Instant` Python process was started.
650
    time_beg: Instant,
651
    /// `Instant` the Python process was first known to be exited.
652
    time_end: Option<Instant>,
653
    /// process ID of the Python process
654
    pid_: u32,
655
    /// this thread ID. For help during debugging.
656
    _tidn: u64,
657
    /// debug message prepend. For help during debugging.
658
    _d_p: String,
659
    /// all stderr is stored in case the process exits with an error
660
    /// this is because stderr may be `read` and only later calls to
661
    /// `poll` or `wait` may find the process has exited.
662
    ///
663
    /// oldest stderr data nearest the front
664
    stderr_all: Option<VecDeque<u8>>,
665
    /// Summary statistic.
666
    /// Maximum number of messages seen in the pipe_stdout channel.
667
    pub(crate) pipe_channel_max_stdout: Count,
668
    /// Summary statistic.
669
    /// Maximum number of messages seen in the pipe_stderr channel.
670
    pub(crate) pipe_channel_max_stderr: Count,
671
    /// Summary statistic.
672
    /// count of reads performed by pipeline thread reading Python process stdout
673
    pub(crate) count_proc_reads_stdout: Count,
674
    /// Summary statistic.
675
    /// count of reads performed by pipeline thread reading Python process stderr
676
    pub(crate) count_proc_reads_stderr: Count,
677
    /// Summary statistic.
678
    /// count of recv of pipeline thread reading Python process stdout
679
    pub(crate) count_pipe_recv_stdout: Count,
680
    /// Summary statistic.
681
    /// count of recv of pipeline thread reading Python process stderr
682
    pub(crate) count_pipe_recv_stderr: Count,
683
    /// Summary statistic.
684
    /// count of writes to Python process stdin
685
    pub(crate) count_proc_writes: Count,
686
    /// Summary statistic.
687
    /// count of polls of Python process
688
    pub(crate) count_proc_polls: Count,
689
    /// Duration of process waiting
690
    pub(crate) duration_proc_wait: Duration,
691
    /// first seen error
692
    pub(crate) error: Option<Error>,
693
}
694

695
impl std::fmt::Debug for PyRunner {
696
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
697
        f.debug_struct("PyRunner")
×
698
            .field("process", &self.process)
×
699
            .field("argv", &self.argv)
×
700
            .field("python_path", &self.python_path)
×
701
            .field("exit_status", &self.exit_status)
×
702
            .field("pipe_stdout_eof", &self.pipe_stdout_eof)
×
703
            .field("pipe_stderr_eof", &self.pipe_stderr_eof)
×
704
            .field("pipe_sent_exit", &self.pipe_sent_exit)
×
705
            .field("pipe_sz_stdout", &self.pipe_sz_stdout)
×
706
            .field("pipe_sz_stderr", &self.pipe_sz_stderr)
×
707
            .field("time_beg", &self.time_beg)
×
708
            .field("time_end", &self.time_end)
×
709
            .field("pid_", &self.pid_)
×
710
            .field("_tidn", &self._tidn)
×
711
            .finish()
×
712
    }
×
713
}
714

715
impl PyRunner {
716
    /// Create a new `PyRunner` instance.
717
    ///
718
    /// `python_to_use` indicates which Python executable to use.
719
    /// If `PythonToUse::Value` is used then `python_path` must be `Some(FPath)`.
720
    /// Otherwise `python_path` must be `None`.
721
    ///
722
    /// `argv` is the list of arguments to pass to the Python executable.
723
    pub fn new(
69✔
724
        python_to_use: PythonToUse,
69✔
725
        pipe_sz: PipeSz,
69✔
726
        recv_timeout: Duration,
69✔
727
        chunk_delimiter_stdout: Option<ChunkDelimiter>,
69✔
728
        chunk_delimiter_stderr: Option<ChunkDelimiter>,
69✔
729
        python_path: Option<FPath>,
69✔
730
        argv: Vec<&str>
69✔
731
    ) -> Result<Self> {
69✔
732
        def1n!("python_to_use {:?}, python_path {:?}, pipe_sz {:?}, chunk_delimiter_stdout {:?}, chunk_delimiter_stderr {:?}, argv {:?}",
69✔
733
            python_to_use, python_path, pipe_sz, chunk_delimiter_stdout, chunk_delimiter_stderr, argv);
734

735
        let python_path_: &FPath;
736
        // get the Python exectuble
737
        if python_to_use == PythonToUse::Value {
69✔
738
            match &python_path {
49✔
739
                Some(val) => python_path_ = val,
49✔
740
                None => {
741
                    let s = format!("PyRunner::new: python_path must be Some(FPath) when python_to_use is Value");
×
742
                    def1x!("Error InvalidInput {}", s);
×
743
                    return Result::Err(
×
744
                        Error::new(ErrorKind::InvalidInput, s)
×
745
                    );
×
746
                }
747
            }
748
        } else {
749
            debug_assert_none!(python_path, "python_path must be None unless python_to_use is Value");
20✔
750
            python_path_ = match find_python_executable(python_to_use) {
20✔
751
                Some(s) => s,
20✔
752
                None => {
753
                    let s = format!(
×
754
                        "failed to find a Python interpreter; create the Python virtual environment with command --venv, or you may specify the Python interpreter path using environment variable {}; failed",
×
755
                        PYTHON_ENV
756
                    );
757
                    def1x!("{}", s);
×
758
                    return Result::Err(
×
759
                        Error::new(ErrorKind::NotFound, s)
×
760
                    )
×
761
                }
762
            };
763
        }
764
        def1o!("Using Python executable: {:?}", python_path_);
69✔
765

766
        // construct argv_
767
        let mut argv_: Vec<&str> = Vec::with_capacity(argv.len() + 1);
69✔
768
        argv_.push(python_path_.as_str());
69✔
769
        for arg in argv.iter() {
198✔
770
            argv_.push(arg);
198✔
771
        }
198✔
772

773
        summary_stat!(
69✔
774
            // save this python path
775
            match PythonPathsRan.write() {
69✔
776
                Ok(mut set) => {
69✔
777
                    if ! set.contains(python_path_) {
69✔
778
                        set.insert(python_path_.clone());
2✔
779
                    }
67✔
780
                }
781
                Err(err) => {
×
782
                    def1x!("Failed to acquire write lock on PythonPathsRan: {}", err);
×
783
                    return Result::Err(
×
784
                        Error::new(
×
785
                            ErrorKind::Other,
×
786
                            format!("Failed to acquire write lock on PythonPathsRan: {}", err),
×
787
                        )
×
788
                    );
×
789
                }
790
            }
791
        );
792

793
        def1o!("Command::new({:?}).args({:?}).spawn()", python_path_, Vec::from_iter(argv_.iter().skip(1)));
69✔
794
        let time_beg: Instant = Instant::now();
69✔
795
        let result = Command::new(python_path_.as_str())
69✔
796
            .args(argv_.iter().skip(1))
69✔
797
            .stdin(Stdio::piped())
69✔
798
            .stdout(Stdio::piped())
69✔
799
            .stderr(Stdio::piped())
69✔
800
            .spawn();
69✔
801
        let mut process: Child = match result {
69✔
802
            Ok(p) => p,
69✔
803
            Err(err) => {
×
804
                def1x!("Failed to start Python process: {}", err);
×
805
                return Result::Err(
×
806
                    Error::new(
×
807
                        err.kind(),
×
808
                        format!("Python process failed to start: Python path {:?}; {}",
×
809
                            python_path_, err),
×
810
                    )
×
811
                );
×
812
            }
813
        };
814

815
        // TODO: [2025/11] is there a more rustic one-liner to create Vec<String> from Vec<&str>?
816
        let mut argv: Vec<String> = Vec::with_capacity(argv_.len());
69✔
817
        for a in argv_.into_iter() {
267✔
818
            argv.push(String::from(a));
267✔
819
        }
267✔
820

821
        let pid: u32 = process.id();
69✔
822
        def1o!("Python process PID {}", pid);
69✔
823

824
        let _d_p = format!("Python process {}", pid);
69✔
825

826
        let process_stdout = match process.stdout.take() {
69✔
827
            Some(s) => s,
69✔
828
            None => {
829
                let s = format!("{_d_p} stdout was None");
×
830
                def1x!("{}", s);
×
831
                return Result::Err(
×
832
                    Error::other(s)
×
833
                );
×
834
            }
835
        };
836
        let process_stderr = match process.stderr.take() {
69✔
837
            Some(s) => s,
69✔
838
            None => {
839
                let s = format!("{_d_p} stderr was None");
×
840
                def1x!("{}", s);
×
841
                return Result::Err(
×
842
                    Error::other(s)
×
843
                );
×
844
            }
845
        };
846

847
        // create PipeStreamReaders for stdout, stderr
848
        def1o!("{_d_p} PipeStreamReader::new() stdout");
69✔
849
        let pipe_sz_stdout: usize = pipe_sz;
69✔
850
        let pipe_stdout = PipeStreamReader::new(
69✔
851
            String::from("stdout"),
69✔
852
            pid,
69✔
853
            pipe_sz_stdout,
69✔
854
            recv_timeout,
69✔
855
            chunk_delimiter_stdout,
69✔
856
            Box::new(process_stdout)
69✔
857
        );
858
        def1o!("{_d_p} PipeStreamReader::new() stderr");
69✔
859
        // stderr pipe capped at 5096 bytes
860
        let pipe_sz_stderr: usize = min(pipe_sz, 5096);
69✔
861
        let pipe_stderr = PipeStreamReader::new(
69✔
862
            String::from("stderr"),
69✔
863
            pid,
69✔
864
            pipe_sz_stderr,
69✔
865
            recv_timeout,
69✔
866
            chunk_delimiter_stderr,
69✔
867
            Box::new(process_stderr)
69✔
868
        );
869

870
        let _tidn: u64 = threadid_to_u64(thread::current().id());
69✔
871
        defx!("{_d_p} PyRunner created for Python process PID {}, TID {}", pid, _tidn);
69✔
872

873
        Result::Ok(Self {
69✔
874
            process,
69✔
875
            pipe_stdout,
69✔
876
            pipe_stderr,
69✔
877
            argv,
69✔
878
            python_path: python_path_.clone(),
69✔
879
            exit_status: None,
69✔
880
            pipe_stdout_eof: false,
69✔
881
            pipe_stderr_eof: false,
69✔
882
            pipe_sent_exit: false,
69✔
883
            pipe_sz_stdout,
69✔
884
            pipe_sz_stderr,
69✔
885
            pid_: pid,
69✔
886
            _tidn,
69✔
887
            _d_p,
69✔
888
            stderr_all: None,
69✔
889
            time_beg,
69✔
890
            time_end: None,
69✔
891
            pipe_channel_max_stdout: 0,
69✔
892
            pipe_channel_max_stderr: 0,
69✔
893
            count_proc_reads_stdout: 0,
69✔
894
            count_proc_reads_stderr: 0,
69✔
895
            count_pipe_recv_stdout: 0,
69✔
896
            count_pipe_recv_stderr: 0,
69✔
897
            count_proc_writes: 0,
69✔
898
            count_proc_polls: 0,
69✔
899
            duration_proc_wait: Duration::default(),
69✔
900
            error: None,
69✔
901
        })
69✔
902
    }
69✔
903

904
    #[allow(dead_code)]
905
    pub fn pid(&self) -> u32 {
×
906
        self.pid_
×
907
    }
×
908

909
    /// Returns the process exit status.
910
    /// If the process has not exited yet, returns `None`.
911
    pub fn exit_status(&self) -> Option<ExitStatus> {
10✔
912
        self.exit_status
10✔
913
    }
10✔
914

915
    /// Returns `true` if the process exited successfully.
916
    pub fn exit_okay(&self) -> bool {
151✔
917
        self.exit_status == Some(ExitStatus::default())
151✔
918
    }
151✔
919

920
    /// convert a `RecvError` into an `Error`
921
    fn new_error_from_recverror(&self, recverror: &RecvError) -> Error {
×
922
        Error::new(
×
923
            ErrorKind::Other,
×
924
            format!("Python process {} RecvError: {}",
×
925
                self.pid_, recverror),
926
        )
927
    }
×
928

929
    /// Returns all stderr data accumulated so far.
930
    /// oldest stderr data nearest the front
931
    pub fn stderr_all(&mut self) -> Option<&[u8]> {
×
932
        match &mut self.stderr_all {
×
933
            Some(v) => {
×
934
                v.make_contiguous();
×
935
                Some(v.as_slices().0)
×
936
            },
937
            None => None,
×
938
        }
939
    }
×
940

941
    /// Poll the Python process to see if it has exited.
942
    /// If the process has exited, returns `Some(ExitStatus)`.
943
    /// If the process is still running, returns `None`.
944
    pub fn poll(&mut self) -> Option<ExitStatus> {
9,292✔
945
        let _d_p: &String = &self._d_p;
9,292✔
946
        def1n!("{_d_p} poll()");
9,292✔
947

948
        summary_stat!(self.count_proc_polls += 1);
9,292✔
949

950
        match self.process.try_wait() {
9,292✔
951
            Ok(Some(exit_status)) => {
2,652✔
952
                if self.time_end.is_none() {
2,652✔
953
                    self.time_end = Some(Instant::now());
18✔
954
                    debug_assert_none!(self.exit_status, "exit_status should not be set yet");
18✔
955
                }
2,634✔
956
                // XXX: not sure if the subprocess returned ExitStatus would
957
                //      change on later polls, so only set it once
958
                let mut _was_exited = true;
2,652✔
959
                if self.exit_status.is_none() {
2,652✔
960
                    self.exit_status = Some(exit_status);
18✔
961
                    _was_exited = false;
18✔
962
                }
2,634✔
963
                if exit_status.success() {
2,652✔
964
                    def1x!("{_d_p} exited successfully{}", if _was_exited { " was" } else { "" });
2,640✔
965
                } else if let Some(_code) = exit_status.code() {
12✔
966
                    def1x!("{_d_p} exited with code {}{}", _code, if _was_exited { " was" } else { "" });
12✔
967
                } else {
968
                    def1x!("{_d_p} exited with status {:?}", exit_status);
×
969
                }
970
                self.pipes_exit_sender(ProcessStatus::Exited);
2,652✔
971

972
                Some(exit_status)
2,652✔
973
            },
974
            Ok(None) => {
975
                // Process is still alive
976
                def1x!("{_d_p} is still running");
6,640✔
977

978
                None
6,640✔
979
            },
980
            Err(err) => {
×
981
                def1x!("{_d_p} poll error: {}", err);
×
982
                self.error = Some(err);
×
983
                self.pipes_exit_sender(ProcessStatus::Exited);
×
984

985
                None
×
986
            }
987
        }
988
    }
9,292✔
989

990
    /// Accumulate stderr data up to a maximum number of bytes.
991
    // TODO: [2025/12] isn't there a more rustic way to do this?
992
    //       or a crate that does this?
993
    fn stderr_all_add(&mut self, stderr_data: &Bytes) {
4,640✔
994
        const MAX_STDERR_ALL_BYTES: usize = 1024;
995
        match self.stderr_all.as_mut() {
4,640✔
996
            Some(se_prior) => {
4,613✔
997
                // store as much as possible of prior + new stderr data
998
                if se_prior.len() + stderr_data.len() <= MAX_STDERR_ALL_BYTES {
4,613✔
999
                    se_prior.extend(stderr_data.iter());
1,622✔
1000
                } else {
1,622✔
1001
                    // need to drop oldest prior data from the front
1002
                    let mut to_drop: usize = se_prior.len() + stderr_data.len() - MAX_STDERR_ALL_BYTES;
2,991✔
1003
                    while to_drop > 0 {
75,629✔
1004
                        se_prior.pop_front();
72,638✔
1005
                        to_drop -= 1;
72,638✔
1006
                    }
72,638✔
1007
                    // signify the front data has been cut off
1008
                    for b_ in "…".bytes() {
8,973✔
1009
                        se_prior.push_front(b_);
8,973✔
1010
                    }
8,973✔
1011
                    // separate prior data from new data
1012
                    se_prior.push_back(b'\n');
2,991✔
1013
                    se_prior.push_back(b'\n');
2,991✔
1014
                    // append the new data
1015
                    se_prior.extend(stderr_data.iter());
2,991✔
1016
                }
1017
            }
1018
            None => {
1019
                let mut v = VecDeque::<u8>::with_capacity(
27✔
1020
                    if stderr_data.len() > MAX_STDERR_ALL_BYTES {
27✔
1021
                        MAX_STDERR_ALL_BYTES
×
1022
                    } else {
1023
                        stderr_data.len()
27✔
1024
                    }
1025
                );
1026
                v.extend(stderr_data.iter());
27✔
1027
                self.stderr_all = Some(v);
27✔
1028
            }
1029
        }
1030
    }
4,640✔
1031

1032
    /// Send exit message to both stdout and stderr pipe threads.
1033
    /// May be called multiple times but only the first call has effect.
1034
    fn pipes_exit_sender(&mut self, pe: ProcessStatus) {
2,733✔
1035
        if self.pipe_sent_exit {
2,733✔
1036
            return;
2,689✔
1037
        }
44✔
1038
        def2ñ!("{} pipes_exit_sender({:?}) (channels len {}, {})",
44✔
1039
               self._d_p, pe, self.pipe_stdout.exit_sender.len(), self.pipe_stderr.exit_sender.len());
44✔
1040
        self.pipe_stdout.exit_sender.send(pe).unwrap_or(());
44✔
1041
        self.pipe_stderr.exit_sender.send(pe).unwrap_or(());
44✔
1042
        self.pipe_sent_exit = true;
44✔
1043
    }
2,733✔
1044

1045
    /// Write to `input_data` then read from the Python process stdout and
1046
    /// stderr.
1047
    /// Returns (`exited`, `stdout`, `stderr`).
1048
    ///
1049
    /// The stderr_all field accumulates all stderr data read so far. This is to help
1050
    /// when some error occurs in the Python process but the process has not yet exited
1051
    /// and then later calls to `read` find the process has exited. The earlier writes to
1052
    /// stderr are preserved in stderr_all because they often have the crucial error
1053
    /// information e.g. a Python stack trace.
1054
    pub fn write_read(&mut self, input_data: Option<&[u8]>) -> (bool, Option<Bytes>, Option<Bytes>) {
9,292✔
1055
        let _len = input_data.unwrap_or(&[]).len();
9,292✔
1056
        def1n!("{} input_data: {} bytes", self._d_p, _len);
9,292✔
1057

1058
        if let Some(_exit_status) = self.poll() {
9,292✔
1059
            def1o!("{} already exited before read", self._d_p);
2,652✔
1060
        }
6,640✔
1061

1062
        // write string, read from stdout and stderr after poll as there may still be data to read
1063
        // even if the process has exited
1064

1065
        // write to stdin
1066
        if !self.exited() {
9,292✔
1067
            if let Some(input_data_) = input_data {
6,640✔
1068
                if !input_data_.is_empty() {
26✔
1069
                    match self.process.stdin.as_mut() {
26✔
1070
                        Some(stdin) => {
26✔
1071
                            def1o!(
26✔
1072
                                "{} writing {} bytes to stdin (\"{}\")",
26✔
1073
                                self._d_p,
1074
                                input_data_.len(),
26✔
1075
                                buffer_to_String_noraw(&input_data_[..input_data_.len().min(10)]).to_string()
26✔
1076
                            );
1077
                            match stdin.write(input_data_) {
26✔
1078
                                Ok(_len) => {
26✔
1079
                                    summary_stat!(self.count_proc_writes += 1);
26✔
1080
                                    def1o!(
26✔
1081
                                        "{} wrote {} bytes to stdin, expected {} bytes",
26✔
1082
                                        self._d_p, _len, input_data_.len()
26✔
1083
                                    );
1084
                                }
1085
                                Err(_err) => {
×
1086
                                    de_err!("Error writing to Python process {} stdin: {:?}", self.pid_, _err);
×
1087
                                    self.pipes_exit_sender(ProcessStatus::Exited);
×
1088
                                }
×
1089
                            }
1090
                        }
1091
                        None => {
×
1092
                            de_err!("{} stdin is None", self._d_p);
×
1093
                        }
×
1094
                    }
1095
                } else {
1096
                    def1o!("{} no stdin data to write", self._d_p);
×
1097
                }
1098
            }
6,614✔
1099
        } else {
1100
            def1o!("{} has exited; skip writing to stdin", self._d_p);
2,652✔
1101
        }
1102

1103
        let _d_p: &String = &self._d_p;
9,292✔
1104
        // use select to block until either channel signals data is available
1105
        let mut sel = Select::new();
9,292✔
1106
        let mut _sel_counts: usize = 0;
9,292✔
1107
        let sel_out: usize = if !self.pipe_stdout_eof {
9,292✔
1108
            let id = sel.recv(&self.pipe_stdout.chunk_receiver) + 1; // avoid zero id
9,156✔
1109
            def1o!("{_d_p} select recv(&pipe_stdout.chunk_receiver)");
9,156✔
1110
            _sel_counts += 1;
9,156✔
1111
            id
9,156✔
1112
        } else { 0 };
136✔
1113
        let sel_err: usize = if !self.pipe_stderr_eof {
9,292✔
1114
            let id = sel.recv(&self.pipe_stderr.chunk_receiver) + 1; // avoid zero id
6,786✔
1115
            def1o!("{_d_p} select recv(&pipe_stderr.chunk_receiver)");
6,786✔
1116
            _sel_counts += 1;
6,786✔
1117
            id
6,786✔
1118
        } else { 0 };
2,506✔
1119

1120
        if sel_out == 0 && sel_err == 0 {
9,292✔
1121
            def1o!("{_d_p} both stdout and stderr EOF; return");
86✔
1122
            return (self.exited_exhausted(), None, None);
86✔
1123
        }
9,206✔
1124

1125
        def1o!("{_d_p} wait on {} selects…", _sel_counts);
9,206✔
1126
        let d1: Instant = Instant::now();
9,206✔
1127
        let sel_oper = sel.select();
9,206✔
1128
        summary_stat!(self.duration_proc_wait += d1.elapsed());
9,206✔
1129
        let sel_index: usize = sel_oper.index() + 1; // avoid zero index
9,206✔
1130
        def1o!("{_d_p} selected {}", sel_index);
9,206✔
1131

1132
        // sanity check `*_stream_eof` is not inconsistent with channel readiness
1133
        if cfg!(any(debug_assertions,test)) {
9,206✔
1134
            if sel_index == sel_out && self.pipe_stdout_eof {
9,206✔
1135
                de_wrn!("pipe_stdout_eof should not be false if sel_out is ready");
×
1136
            }
9,206✔
1137
            if sel_index == sel_err && self.pipe_stderr_eof {
9,206✔
1138
                de_wrn!("pipe_stderr_eof should not be false if sel_err is ready");
×
1139
            }
9,206✔
1140
        }
×
1141

1142
        let mut stdout_data: Option<Bytes> = None;
9,206✔
1143
        let mut stderr_data: Option<Bytes> = None;
9,206✔
1144

1145
        // avoid borrow-checker conflicts
1146
        let _d_p = ();
9,206✔
1147

1148
        match sel_index {
9,206✔
1149
            // TODO: combine these matches since they are nearly identical?
1150
            //       though stdout might be treated differently from stderr?
1151
            //       maybe the messages passed back to main thread should distinguish
1152
            //       between stdout and stderr ?
1153
            i if i == sel_out && sel_out != 0 => {
9,206✔
1154
                // read stdout
1155
                summary_stat!(self.pipe_channel_max_stdout =
4,513✔
1156
                    max(
4,513✔
1157
                        self.pipe_channel_max_stdout,
4,513✔
1158
                        self.pipe_stdout.chunk_receiver.len() as Count
4,513✔
1159
                    )
4,513✔
1160
                );
1161
                def1o!("{} recv(&pipe_stdout.chunk_receiver)…", self._d_p);
4,513✔
1162
                summary_stat!(self.count_pipe_recv_stdout += 1);
4,513✔
1163
                match sel_oper.recv(&self.pipe_stdout.chunk_receiver) {
4,513✔
1164
                    Ok(remote_result) => {
4,513✔
1165
                        match remote_result {
4,513✔
1166
                            Ok(piped_line) => {
4,513✔
1167
                                match piped_line {
4,513✔
1168
                                    PipedChunk::Chunk(chunk) => {
4,468✔
1169
                                        let len_ = chunk.len();
4,468✔
1170
                                        def1o!("{} received {} bytes from stdout", self._d_p, len_);
4,468✔
1171
                                        stdout_data = Some(Vec::with_capacity(len_ + 1));
4,468✔
1172
                                        let data = stdout_data.as_mut().unwrap();
4,468✔
1173
                                        data.extend_from_slice(chunk.as_slice());
4,468✔
1174
                                    }
1175
                                    PipedChunk::Continue => {
1176
                                        def1o!("{} stdout Continue", self._d_p);
8✔
1177
                                    }
1178
                                    PipedChunk::Done(reads, remaining_bytes) => {
37✔
1179
                                        summary_stat!(self.count_proc_reads_stdout = reads);
37✔
1180
                                        def1o!("{} stdout Done({} reads, {} remaining bytes)",
37✔
1181
                                               self._d_p, reads, remaining_bytes.len());
37✔
1182
                                        if !remaining_bytes.is_empty() {
37✔
1183
                                            stdout_data = Some(Vec::with_capacity(remaining_bytes.len() + 1));
×
1184
                                            let data = stdout_data.as_mut().unwrap();
×
1185
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1186
                                        }
37✔
1187
                                        self.pipe_stdout_eof = true;
37✔
1188
                                        self.pipes_exit_sender(ProcessStatus::Exited);
37✔
1189
                                    }
1190
                                }
1191
                            }
1192
                            Err(error) => {
×
1193
                                de_err!("Error reading from Python process {} stdout: {:?}", self.pid_, error);
×
1194
                                self.error = Some(error);
×
1195
                                self.pipe_stdout_eof = true;
×
1196
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1197
                            }
×
1198
                        }
1199
                    }
1200
                    Err(recverror) => {
×
1201
                        def1o!("{} stdout channel RecvError {}; set pipe_stdout_eof=true", self._d_p, recverror);
×
1202
                        self.error = Some(self.new_error_from_recverror(&recverror));
×
1203
                        self.pipe_stdout_eof = true;
×
1204
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1205
                    }
1206
                }
1207
            }
1208
            i if i == sel_err && sel_err != 0 => {
4,693✔
1209
                // read stderr
1210
                summary_stat!(self.pipe_channel_max_stderr =
4,693✔
1211
                    max(
4,693✔
1212
                        self.pipe_channel_max_stderr,
4,693✔
1213
                        self.pipe_stderr.chunk_receiver.len() as Count
4,693✔
1214
                    )
4,693✔
1215
                );
1216
                def1o!("{} recv(&pipe_stderr.chunk_receiver)…", self._d_p);
4,693✔
1217
                summary_stat!(self.count_pipe_recv_stderr += 1);
4,693✔
1218
                match sel_oper.recv(&self.pipe_stderr.chunk_receiver) {
4,693✔
1219
                    Ok(remote_result) => {
4,693✔
1220
                        match remote_result {
4,693✔
1221
                            Ok(piped_line) => {
4,693✔
1222
                                match piped_line {
4,693✔
1223
                                    PipedChunk::Chunk(chunk) => {
4,640✔
1224
                                        let len_ = chunk.len();
4,640✔
1225
                                        def1o!("{} received {} bytes from stderr", self._d_p, len_);
4,640✔
1226
                                        let mut data: Bytes = Bytes::with_capacity(len_);
4,640✔
1227
                                        data.extend_from_slice(chunk.as_slice());
4,640✔
1228
                                        self.stderr_all_add(&data);
4,640✔
1229
                                        stderr_data = Some(data);
4,640✔
1230
                                    }
1231
                                    PipedChunk::Continue => {
1232
                                        def1o!("{} stderr Continue", self._d_p);
9✔
1233
                                    }
1234
                                    PipedChunk::Done(reads, remaining_bytes) => {
44✔
1235
                                        summary_stat!(self.count_proc_reads_stderr = reads);
44✔
1236
                                        def1o!("{} stderr Done({} reads, {} remaining bytes)",
44✔
1237
                                               self._d_p, reads, remaining_bytes.len());
44✔
1238
                                        if !remaining_bytes.is_empty() {
44✔
1239
                                            let mut data: Bytes = Bytes::with_capacity(remaining_bytes.len());
×
1240
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1241
                                            self.stderr_all_add(&data);
×
1242
                                            stderr_data = Some(data);
×
1243
                                        }
44✔
1244
                                        self.pipe_stderr_eof = true;
44✔
1245
                                        self.pipes_exit_sender(ProcessStatus::Exited);
44✔
1246
                                    }
1247
                                }
1248
                            }
1249
                            Err(error) => {
×
1250
                                de_err!("Error reading from Python process {} stderr: {:?}", self.pid_, error);
×
1251
                                self.error = Some(error);
×
1252
                                self.pipe_stderr_eof = true;
×
1253
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1254
                            }
×
1255
                        }
1256
                    }
1257
                    Err(_err) => {
×
1258
                        def1o!("{} stderr channel RecvError {}; set pipe_stderr_eof=true", self._d_p, _err);
×
1259
                        self.error = Some(self.new_error_from_recverror(&_err));
×
1260
                        self.pipe_stderr_eof = true;
×
1261
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1262
                    }
1263
                }
1264
            }
1265
            _i => {
×
1266
                def1o!("{} selected unknown index {}", self._d_p, _i);
×
1267
            }
1268
        }
1269

1270
        def1x!("{} return ({}, stdout bytes {:?} (eof? {}), stderr bytes {:?} (eof? {}))",
9,206✔
1271
                self._d_p,
1272
                self.exited_exhausted(),
9,206✔
1273
                stdout_data.as_ref().unwrap_or(&vec![]).len(),
9,206✔
1274
                self.pipe_stdout_eof,
1275
                stderr_data.as_ref().unwrap_or(&vec![]).len(),
9,206✔
1276
                self.pipe_stderr_eof
1277
        );
1278

1279
        (self.exited_exhausted(), stdout_data, stderr_data)
9,206✔
1280
    }
9,292✔
1281

1282
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`?
1283
    pub fn exited(&self) -> bool {
9,338✔
1284
        self.exit_status.is_some()
9,338✔
1285
    }
9,338✔
1286

1287
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`
1288
    /// *and* have both stdout and stderr streams reached EOF?
1289
    pub fn exited_exhausted(&self) -> bool {
18,643✔
1290
        self.exit_status.is_some() && self.pipe_stdout_eof && self.pipe_stderr_eof
18,643✔
1291
    }
18,643✔
1292

1293
    /// Wait for the Python process to exit.
1294
    /// If the process has already exited then return the saved `ExitStatus`.
1295
    pub fn wait(&mut self) -> Result<ExitStatus> {
×
1296
        let _d_p: &String = &self._d_p;
×
1297
        if self.exited() {
×
1298
            def1ñ!("{_d_p} exited; return {:?}",
×
1299
                   self.exit_status.unwrap());
×
1300
            return Ok(self.exit_status.unwrap());
×
1301
        }
×
1302
        def1n!("{_d_p} wait()");
×
1303
        let d1: Instant = Instant::now();
×
1304
        // XXX: should `wait` be passed a timeout?
1305
        let rc = self.process.wait();
×
1306
        summary_stat!(self.duration_proc_wait += d1.elapsed());
×
1307
        match rc {
×
1308
            Ok(exit_status) => {
×
1309
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
×
1310
                self.time_end = Some(Instant::now());
×
1311
                // prefer the first saved `ExitStatus`
1312
                // however setting `self.exit_status` again is never expected to happen
1313
                if self.exit_status.is_none() {
×
1314
                    self.exit_status = Some(exit_status);
×
1315
                } else {
×
1316
                    debug_panic!("Python process {} exit_status is already set! {:?}",
×
1317
                                 self.pid_, self.exit_status)
1318
                }
1319
                def1x!("{_d_p} wait returned {:?}", exit_status);
×
1320
                return Ok(self.exit_status.unwrap());
×
1321
            }
1322
            Err(error) => {
×
1323
                de_err!("{_d_p} wait returned {:?}", error);
×
1324
                def1x!("{_d_p} error wait returned {:?}", error);
×
1325
                return Result::Err(
×
1326
                    Error::new(
×
1327
                        error.kind(),
×
1328
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1329
                    )
×
1330
                );
×
1331
            }
1332
        }
1333
    }
×
1334

1335
    /// Total run duration of the process; imprecise as the end time is merely the first `Instant`
1336
    /// a `subprocess::poll` or `subprocess::wait` returned an `ExitStatus`.
1337
    /// Precise enough for most needs.
1338
    ///
1339
    /// Returns `None` if the process is not yet known to have exited.
1340
    pub fn duration(&self) -> Option<Duration> {
26✔
1341
        self.time_end?;
26✔
1342
        match self.time_end {
26✔
1343
            Some(time_end) => Some(time_end - self.time_beg),
26✔
1344
            None => {
1345
                debug_panic!("Python process {} is exited but time_end is None", self.pid_);
×
1346

1347
                None
×
1348
            }
1349
        }
1350
    }
26✔
1351

1352
    /// Read from the `PyRunner`, print the output, and wait for it to finish.
1353
    /// Do not call `read` or `wait` before calling this function.
1354
    /// Helper for simple Python commands that do not require interaction.
1355
    ///
1356
    /// This is not expected to be run as part of normal operation of
1357
    /// `s4`. This is for special operations such as `s4 --venv`. It prints
1358
    /// to stdout. In normal operation of `s4`, only the main
1359
    /// thread should print to stdout.
1360
    pub fn run(&mut self, print_argv: bool, print_stdout: bool, print_stderr: bool) -> Result<(Bytes, Bytes)> {
26✔
1361
        let _d_p: &String = &self._d_p;
26✔
1362
        def1n!("{_d_p}, print_argv={}", print_argv);
26✔
1363

1364
        if self.exited() {
26✔
1365
            debug_panic!("{_d_p} already exited!");
×
1366
            return Result::Ok((Bytes::with_capacity(0), Bytes::with_capacity(0)));
×
1367
        }
26✔
1368

1369
        if print_argv {
26✔
1370
            // print the command executed
1371

1372
            // get the prompt, prefer to use PS4 env var
1373
            let prompt = match env::var("PS4") {
10✔
1374
                Ok(s) => {
×
1375
                    if s.is_empty() {
×
1376
                        String::from(PROMPT_DEFAULT)
×
1377
                    } else {
1378
                        s
×
1379
                    }
1380
                },
1381
                Err(_) => String::from(PROMPT_DEFAULT),
10✔
1382
            };
1383
            // print the command, escaping each argument
1384
            let mut lock = stdout().lock();
10✔
1385
            // TODO: [2025/11/17] handle write returning an error?
1386
            let _ = lock.write(prompt.as_bytes());
10✔
1387
            for arg in self.argv.iter() {
51✔
1388
                let es = escape(arg.into());
51✔
1389
                let _ = lock.write(es.as_bytes());
51✔
1390
                let _ = lock.write(b" ");
51✔
1391
            }
51✔
1392
            let _ = lock.write(b"\n");
10✔
1393
            let _ = lock.flush();
10✔
1394
        }
16✔
1395

1396
        let rc = self.process.wait();
26✔
1397
        def1o!("{_d_p} wait() returned {:?}", rc);
26✔
1398
        match rc {
26✔
1399
            Ok(exit_status) => {
26✔
1400
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
26✔
1401
                self.time_end = Some(Instant::now());
26✔
1402
                // prefer the first saved `ExitStatus`
1403
                // however setting `self.exit_status` again is never expected to happen
1404
                if self.exit_status.is_none() {
26✔
1405
                    self.exit_status = Some(exit_status);
26✔
1406
                } else {
26✔
1407
                    debug_panic!("{_d_p} exit_status is already set! {:?}",
×
1408
                                 self.exit_status)
1409
                }
1410
            }
1411
            Err(error) => {
×
1412
                de_err!("{_d_p} wait returned {:?}", error);
×
1413
                def1x!("{_d_p} error wait returned {:?}", error);
×
1414
                return Result::Err(
×
1415
                    Error::new(
×
1416
                        error.kind(),
×
1417
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1418
                    )
×
1419
                );
×
1420
            }
1421
        }
1422

1423
        let mut stdout_data: Bytes = Bytes::with_capacity(2056);
26✔
1424
        let mut stderr_data: Bytes = Bytes::with_capacity(1024);
26✔
1425

1426
        // remove _d_p reference to avoid borrow checker conflict that would occur in the loop
1427
        let _d_p = ();
26✔
1428

1429
        // print remaining stdout and stderr
1430
        loop {
1431
            let (
1432
                _exited,
1,988✔
1433
                out_data,
1,988✔
1434
                err_data,
1,988✔
1435
            ) = self.write_read(None);
1,988✔
1436
            def1o!("{} exited? {:?}", self._d_p, _exited);
1,988✔
1437
            // print stdout to stdout
1438
            if let Some(data) = out_data {
1,988✔
1439
                stdout_data.extend_from_slice(data.as_slice());
1,883✔
1440
                if ! data.is_empty() && print_stdout {
1,883✔
1441
                    let mut lock = stdout().lock();
1,859✔
1442
                    let _ = lock.write(data.as_slice());
1,859✔
1443
                    let _ = lock.flush();
1,859✔
1444
                }
1,859✔
1445
            }
105✔
1446
            // print stderr to stderr
1447
            if let Some(data) = err_data {
1,988✔
1448
                stderr_data.extend_from_slice(data.as_slice());
53✔
1449
                if ! data.is_empty() && print_stderr {
53✔
1450
                    let mut lock = stderr().lock();
27✔
1451
                    let _ = lock.write(data.as_slice());
27✔
1452
                    let _ = lock.flush();
27✔
1453
                }
27✔
1454
            }
1,935✔
1455
            if _exited {
1,988✔
1456
                break;
26✔
1457
            }
1,962✔
1458
        }
1459

1460
        let _d_p: &String = &self._d_p;
26✔
1461

1462
        match self.exit_status {
26✔
1463
            Some(status) => {
26✔
1464
                if ! status.success() {
26✔
1465
                    let s = format!("Python process {} exited with non-zero status {:?}", self.pid_, status);
×
1466
                    def1x!("{_d_p} {}", s);
×
1467
                    return Result::Err(
×
1468
                        Error::other(s)
×
1469
                    )
×
1470
                }
26✔
1471
            }
1472
            None => {
1473
                debug_panic!("{_d_p} exit_status is None after wait()");
×
1474
            }
1475
        }
1476

1477
        if print_argv {
26✔
1478
            let mut lock = stdout().lock();
10✔
1479
            let _ = lock.write(b"\n");
10✔
1480
            let _ = lock.flush();
10✔
1481
        }
16✔
1482

1483
        def1x!("{_d_p} duration {:?}, return Ok(stdout {} bytes, stderr {} bytes)",
26✔
1484
            self.duration(), stdout_data.len(), stderr_data.len());
26✔
1485

1486
        Result::Ok((stdout_data, stderr_data))
26✔
1487
    }
26✔
1488

1489
    /// Create a `PyRunner`, run it, return Ok or Err.
1490
    ///
1491
    /// This calls `PyRunner::new()` and then `PyRunner::run()`.
1492
    /// See `run()` regarding its intended use.
1493
    pub fn run_once(
17✔
1494
        python_to_use: PythonToUse,
17✔
1495
        pipe_sz: PipeSz,
17✔
1496
        recv_timeout: Duration,
17✔
1497
        chunk_delimiter: ChunkDelimiter,
17✔
1498
        python_path: Option<FPath>,
17✔
1499
        argv: Vec<&str>,
17✔
1500
        print_argv: bool
17✔
1501
    ) -> Result<(PyRunner, Bytes, Bytes)> {
17✔
1502
        def1ñ!("({:?}, {:?}, {:?})", python_to_use, python_path, argv);
17✔
1503
        let mut pyrunner = match PyRunner::new(
17✔
1504
            python_to_use,
17✔
1505
            pipe_sz,
17✔
1506
            recv_timeout,
17✔
1507
            Some(chunk_delimiter),
17✔
1508
            None,
17✔
1509
            python_path,
17✔
1510
            argv,
17✔
1511
        ) {
17✔
1512
            Ok(pyrunner) => pyrunner,
17✔
1513
            Err(err) => {
×
1514
                def1x!("PyRunner::new failed {:?}", err);
×
1515
                return Result::Err(err);
×
1516
            }
1517
        };
1518

1519
        match pyrunner.run(print_argv, true, true) {
17✔
1520
            Ok((stdout_data, stderr_data)) => {
17✔
1521
                def1x!("PyRunner::run Ok");
17✔
1522
                return Result::Ok((pyrunner, stdout_data, stderr_data));
17✔
1523
            }
1524
            Err(err) => {
×
1525
                def1x!("PyRunner::run Error {:?}", err);
×
1526
                return Result::Err(err);
×
1527
            }
1528
        }
1529
    }
17✔
1530
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc