• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jtmoon79 / super-speedy-syslog-searcher / 20670370640

03 Jan 2026 01:35AM UTC coverage: 67.995% (+0.01%) from 67.982%
20670370640

push

github

jtmoon79
(LIB TEST) journalreader.rs use summary_stat!

24 of 33 new or added lines in 1 file covered. (72.73%)

1 existing line in 1 file now uncovered.

15664 of 23037 relevant lines covered (67.99%)

123375.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.33
/src/python/pyrunner.rs
1
// src/python/pyrunner.rs
2

3
//! Runs a Python process instance. It communicates with the Python process
4
//! over threaded `PipeStreamReader`s connected to stdout, stderr, and stdin.
5
//! It uses `std::process::Child` to start and manage the Python process.
6

7
use std::collections::{
8
    HashSet,
9
    VecDeque,
10
};
11
use std::env;
12
use std::io::{
13
    Error,
14
    ErrorKind,
15
    Read,
16
    Result,
17
    Write,
18
    stderr,
19
    stdout,
20
};
21
use std::path::PathBuf;
22
use std::process::{
23
    Child,
24
    Command,
25
    Stdio,
26
};
27
use std::sync::RwLock;
28
use std::thread;
29
use std::time::{
30
    Duration,
31
    Instant,
32
};
33

34
use ::crossbeam_channel::{
35
    Sender,
36
    Receiver,
37
    RecvError,
38
    RecvTimeoutError,
39
    Select,
40
};
41
use ::lazy_static::lazy_static;
42
use ::memchr::memmem::Finder as memchr_Finder;
43
use ::once_cell::sync::OnceCell;
44
use ::pathsearch::find_executable_in_path;
45
use ::shell_escape::escape;
46
#[allow(unused_imports)]
47
use ::si_trace_print::{
48
    defñ,
49
    defn,
50
    defo,
51
    defx,
52
    def1ñ,
53
    def1n,
54
    def1o,
55
    def1x,
56
    def2ñ,
57
    def2n,
58
    def2o,
59
    def2x,
60
    e,
61
    ef1n,
62
    ef1o,
63
    ef1x,
64
    ef1ñ,
65
    ef2n,
66
    ef2o,
67
    ef2x,
68
    ef2ñ,
69
};
70

71
use crate::{
72
    de_err,
73
    de_wrn,
74
    debug_assert_none,
75
    debug_panic,
76
};
77
use crate::common::{
78
    Bytes,
79
    Count,
80
    FPath,
81
    threadid_to_u64,
82
    summary_stat,
83
};
84
#[cfg(any(debug_assertions, test))]
85
use crate::debug::printers::buffer_to_String_noraw;
86
use crate::readers::helpers::path_to_fpath;
87
use crate::python::venv::venv_path;
88

89
/// Python process exit result
90
pub type ExitStatus = std::process::ExitStatus;
91

92
/// Size of pipe read/write buffers in bytes
93
pub type PipeSz = usize;
94

95
/// Delimiter byte used to separate chunks of data read from the Python process
96
pub type ChunkDelimiter = u8;
97

98
/// Names of possible Python executables that could be found in path
99
pub const PYTHON_NAMES: [&str; 13] = [
100
    "python3",
101
    "python",
102
    "python3.exe",
103
    "python.exe",
104
    "python37",
105
    "python38",
106
    "python39",
107
    "python310",
108
    "python311",
109
    "python312",
110
    "python313",
111
    "pypy3",
112
    "pypy",
113
];
114
/// Possible subdirectories within a Python installation where the Python
115
/// interpreter executable may be found
116
pub const PYTHON_SUBDIRS: [&str; 3] = [
117
    "bin",
118
    "Scripts",
119
    "",
120
];
121

122
pub const PROMPT_DEFAULT: &str = "$ ";
123

124
/// Environment variable that refers to the exact path to a Python interpreter
125
/// executable
126
pub const PYTHON_ENV: &str = "S4_PYTHON";
127

128
/// default timeout for Pipe `recv_timeout` when reading from the child Python processes
129
pub const RECV_TIMEOUT: Duration = Duration::from_millis(5);
130

131
/// cached Python path found in environment variable `S4_PYTHON`.
132
/// set in `find_python_executable`
133
#[allow(non_upper_case_globals)]
134
pub static PythonPathEnv: OnceCell<Option<FPath>> = OnceCell::new();
135
/// cached Python path found in path, set in `find_python_executable`
136
#[allow(non_upper_case_globals)]
137
pub static PythonPathPath: OnceCell<Option<FPath>> = OnceCell::new();
138
/// cached Python path in s4 venv, set in `find_python_executable`
139
#[allow(non_upper_case_globals)]
140
pub static PythonPathVenv: OnceCell<Option<FPath>> = OnceCell::new();
141

142
lazy_static! {
143
    /// Summary statistic.
144
    /// Record which Python interpreters ran.
145
    /// only intended for summary printing
146
    pub static ref PythonPathsRan: RwLock<HashSet<FPath>> = {
147
        defñ!("init PythonPathsRan");
148

149
        RwLock::new(HashSet::<FPath>::new())
150
    };
151
}
152

153
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154
pub enum PythonToUse {
155
    /// only use Python referred to by environment variable `S4_PYTHON`
156
    Env,
157
    /// only use Python found in the `PATH`
158
    Path,
159
    /// use Python referred to by environment variable `S4_PYTHON` if set
160
    /// but if not set then use Python found in the `PATH`
161
    EnvPath,
162
    /// only use Python in predetermined s4 .venv
163
    Venv,
164
    /// use Python referred to by environment variable `S4_PYTHON` if set
165
    /// but if not set then use Python in predetermined s4 .venv
166
    EnvVenv,
167
    /// use a passed value
168
    Value,
169
}
170

171
/// find a Python executable.
172
/// `python_to_use` instructs how to find the Python executable.
173
/// Does not check if the found Python executable is valid.
174
/// Caches found paths for reliability among threads.
175
/// Returns `None` when passed `PythonToUse::Value`.
176
pub fn find_python_executable(python_to_use: PythonToUse) -> &'static Option<FPath> {
79✔
177
    defn!("{:?}", python_to_use);
79✔
178

179
    match python_to_use {
79✔
180
        PythonToUse::Env => {
181
            let ret: &Option<FPath> = PythonPathEnv.get_or_init(||
10✔
182
                // check process environment variable
183
                match env::var(PYTHON_ENV) {
1✔
184
                    Ok(val) => {
×
185
                        defo!("env::var found {}={:?}", PYTHON_ENV, val);
×
186
                        if ! val.is_empty() {
×
187
                            Some(val)
×
188
                        } else {
189
                            None
×
190
                        }
191
                    }
192
                    Err (_err) => {
1✔
193
                        defo!("env::var did not find {:?}; {:?}", PYTHON_ENV, _err);
1✔
194
                        None
1✔
195
                    }
196
                }
1✔
197
            );
198
            defx!("{:?}, return {:?}", python_to_use, ret);
10✔
199

200
            ret
10✔
201
        }
202
        PythonToUse::Path => {
203
            let ret: &Option<FPath> = PythonPathPath.get_or_init(||{
42✔
204
                let mut python_path: Option<PathBuf> = None;
1✔
205
                // check PATH for python executable
206
                for name in PYTHON_NAMES.iter() {
1✔
207
                    defo!("find_executable_in_path({:?})", name);
1✔
208
                    if let Some(p) = find_executable_in_path(name) {
1✔
209
                        defo!("find_executable_in_path returned {:?}", p);
1✔
210
                        python_path = Some(p);
1✔
211
                        break;
1✔
212
                    };
×
213
                }
214
                if let Some(p) = python_path {
1✔
215
                    Some(path_to_fpath(p.as_path()))
1✔
216
                } else {
217
                    None
×
218
                }
219
            });
1✔
220
            defx!("{:?}, return {:?}", python_to_use, ret);
42✔
221

222
            ret
42✔
223
        }
224
        PythonToUse::EnvPath => {
225
            // try Env then try Path
226
            let p = find_python_executable(PythonToUse::Env);
1✔
227
            if p.is_some() {
1✔
228
                defx!("{:?}, return {:?}", python_to_use, p);
×
229
                return p;
×
230
            }
1✔
231
            let p = find_python_executable(PythonToUse::Path);
1✔
232
            defx!("{:?}, return {:?}", python_to_use, p);
1✔
233

234
            p
1✔
235
        }
236
        PythonToUse::Venv => {
237
            let ret: &Option<FPath> = PythonPathVenv.get_or_init(||{
17✔
238
                // get the venv path
239
                let venv: PathBuf = venv_path();
1✔
240
                defo!("venv={:?}", venv);
1✔
241
                // look for common subdirectories of Python virtual environments where the
242
                // Python executable may be found
243
                // XXX: we could try to do this by platform
244
                //      i.e. on Windows only look in "Scripts", etc.
245
                //      but this is fine
246
                for dir in PYTHON_SUBDIRS.iter() {
1✔
247
                    let mut venv_dir = venv.clone();
1✔
248
                    if ! dir.is_empty() {
1✔
249
                        venv_dir.push(dir);
1✔
250
                    }
1✔
251
                    for name in PYTHON_NAMES.iter() {
1✔
252
                        let mut venv_name = venv_dir.clone();
1✔
253
                        venv_name.push(name);
1✔
254
                        defo!("venv_name.exists?={:?}", venv_name);
1✔
255
                        if venv_name.exists() {
1✔
256
                            let fp = path_to_fpath(venv_name.as_path());
1✔
257
                            defo!("found venv python executable: {:?}", fp);
1✔
258
                            return Some(fp);
1✔
259
                        }
×
260
                    }
261
                }
262
                None
×
263
            });
1✔
264
            defx!("{:?}, return {:?}", python_to_use, ret);
17✔
265

266
            ret
17✔
267
        }
268
        PythonToUse::EnvVenv => {
269
            // try Env then try Venv
270
            let p = find_python_executable(PythonToUse::Env);
9✔
271
            if p.is_some() {
9✔
272
                defx!("{:?}, return {:?}", python_to_use, p);
×
273
                return p;
×
274
            }
9✔
275
            let p = find_python_executable(PythonToUse::Venv);
9✔
276
            defx!("{:?}, return {:?}", python_to_use, p);
9✔
277

278
            p
9✔
279
        }
280
        PythonToUse::Value => {
281
            debug_panic!("PythonToUse::Value should not be used in find_python_executable");
×
282

283
            &None
×
284
        }
285
    }
286
}
79✔
287

288
#[derive(Debug)]
289
enum PipedChunk {
290
    /// a chunk of bytes read from the child process
291
    Chunk(Bytes),
292
    /// process not sending but still running
293
    Continue,
294
    /// process exited or no more data to read
295
    /// contains number of reads performed and remaining bytes
296
    Done(u64, Bytes),
297
}
298

299
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
300
enum ProcessStatus {
301
    #[default]
302
    Running,
303
    Exited,
304
}
305

306
/// Reads data from the pipe and returns chunks of data to the caller.
307
/// Churks are delimited by the passed `chunk_delimiter_opt` if set.
308
/// If `chunk_delimiter_opt` is `None` then each read immediately returns any data read.
309
///
310
/// Inspired by gist [ArtemGr/db40ae04b431a95f2b78](https://gist.github.com/ArtemGr/db40ae04b431a95f2b78).
311
struct PipeStreamReader {
312
    chunk_receiver: Receiver<core::result::Result<PipedChunk, Error>>,
313
    exit_sender: Sender<ProcessStatus>,
314
}
315

316
impl PipeStreamReader {
317
    /// Starts a thread reading bytes from a child process pipe.
318
    ///
319
    /// `pipe_sz` is the size of the Pipe chunk buffer in bytes.
320
    ///
321
    /// `recv_timeout` is the timeout duration for calls to `recv_timeout()`.
322
    ///
323
    /// `chunk_delimiter_opt` is an optional byte delimiter used to separate chunks of data.
324
    /// If `None` then each read immediately returns any data read.
325
    ///
326
    /// `stream_child_proc` is the `Read` stream of the child process to read from.
327
    ///
328
    /// `name` and `pid` are used for debugging messages.
329
    /// `name` is the name of the pipe.
330
    /// `pid` is the process ID of the child process.
331
    fn new(
134✔
332
        name: String,
134✔
333
        pid: u32,
134✔
334
        pipe_sz: PipeSz,
134✔
335
        recv_timeout: Duration,
134✔
336
        chunk_delimiter_opt: Option<ChunkDelimiter>,
134✔
337
        mut stream_child_proc: Box<dyn Read + Send>
134✔
338
    ) -> PipeStreamReader
134✔
339
    {
340
        def1n!("PipeStreamReader new(pipe_sz={}, name={:?}, chunk_delimiter_opt={:?})", pipe_sz, name, chunk_delimiter_opt);
134✔
341
        def1o!("PipeStreamReader {:?} create unbounded channel", name);
134✔
342
        let (tx_exit, rx_exit) = ::crossbeam_channel::unbounded();
134✔
343

344
        PipeStreamReader {
345
            chunk_receiver: {
346
                let thread_name: String = format!("{}_PipeStreamReader", name);
134✔
347
                let _thread_name2: String = thread_name.clone();
134✔
348
                // parent thread ID
349
                let _tidn_p: u64 = threadid_to_u64(thread::current().id());
134✔
350
                // debug message prepend
351
                let _d_p = format!(
134✔
352
                    "PipeStreamReader {:?} PID {:?} PTID {:?}",
134✔
353
                    name, pid, _tidn_p
354
                );
355
                def1o!("{_d_p} create unbounded channel");
134✔
356
                // TODO: make this bounded
357
                let (tx_parent, rx_parent) = ::crossbeam_channel::unbounded();
134✔
358

359
                let thread_pipe = thread::Builder::new().name(thread_name.clone());
134✔
360

361
                def1o!("{_d_p} spawn thread {:?}", thread_name);
134✔
362
                let result = thread_pipe.spawn(move ||
134✔
363
                {
134✔
364
                    // debug message prepend
365
                    let _d_p = format!(
134✔
366
                        "PipeStreamReader {:?} PID {:?} PTID {:?} TID {:?}",
134✔
367
                        name, pid, _tidn_p, threadid_to_u64(thread::current().id()));
134✔
368
                    def2n!("{_d_p} start, pipe_sz {}", pipe_sz);
134✔
369
                    let mut _recv_bytes: usize = 0;
134✔
370
                    let mut reads: usize = 0;
134✔
371
                    let mut _sends: usize = 0;
134✔
372
                    let mut delim_found: bool = false;
134✔
373
                    let mut buf = Bytes::with_capacity(pipe_sz);
134✔
374
                    let buf_chunk1_sz: usize = match chunk_delimiter_opt {
134✔
375
                        Some(_delim) => pipe_sz,
77✔
376
                        None => 0, // `buf_chunk1` not used
57✔
377
                    };
378
                    let mut buf_chunk1: Bytes = Bytes::with_capacity(buf_chunk1_sz);
134✔
379
                    //let mut buf_chunk2: Bytes = Bytes::with_capacity(pipe_sz);
380
                    loop {
381
                        reads += 1;
82,555✔
382
                        buf.clear();
82,555✔
383
                        buf.resize(pipe_sz, 0);
82,555✔
384

385
                        def2o!("{_d_p} stream_child_proc.read(buf capacity {}, len {})…", buf.capacity(), buf.len());
82,555✔
386
                        /*
387
                        From the docs regarding read():
388

389
                            This function does not provide any guarantees about whether it blocks
390
                            waiting for data, but if an object needs to block for a read and cannot,
391
                            it will typically signal this via an Err return value.
392

393
                        See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
394
                        */
395
                        match stream_child_proc.read(&mut buf) {
82,555✔
396
                            Ok(0) => {
397
                                def2o!("{_d_p} read zero bytes of {} total", _recv_bytes);
137✔
398
                                /*
399
                                From the docs regarding read() returning Ok(0):
400

401
                                    This reader has reached its "end of file" and will likely no
402
                                    longer be able to produce bytes. Note that this does not mean
403
                                    that the reader will always no longer be able to produce bytes.
404
                                    As an example, on Linux, this method will call the recv syscall
405
                                    for a [TcpStream], where returning zero indicates the connection
406
                                    was shut down correctly. While for [File], it is possible to
407
                                    reach the end of file and get zero as result, but if more data
408
                                    is appended to the file, future calls to read will return more
409
                                    data.
410

411
                                See https://doc.rust-lang.org/1.83.0/std/io/trait.Read.html#tymethod.read
412
                                */
413
                                if delim_found {
137✔
414
                                    delim_found = false;
53✔
415
                                }
84✔
416
                                // XXX: if the child python process has exited then this may become
417
                                //      a busy loop until the parent thread notices the python
418
                                //      process has exited and then can send a
419
                                //      `ProcessStatus::Exited`.
420
                                //      Using `recv_timeout(5ms)` softens this busy loop.
421
                                //      It's ugly but it works.
422
                                def2o!("{_d_p} rx_exit.recv_timeout({:?})…", recv_timeout);
137✔
423
                                let rx_result = rx_exit.recv_timeout(recv_timeout);
137✔
424
                                match rx_result {
88✔
425
                                    Ok(ProcessStatus::Exited) => {
426
                                        def2o!("{_d_p} rx_exit ProcessStatus::Exited; send Done({}, buf_chunk1 {} bytes) and break",
88✔
427
                                               reads, buf_chunk1.len());
88✔
428
                                        _sends += 1;
88✔
429
                                        match tx_parent.send(Ok(PipedChunk::Done(reads as u64, buf_chunk1))) {
88✔
430
                                            Ok(_) => {}
85✔
431
                                            Err(_err) => {
3✔
432
                                                def2o!("{_d_p} tx send error: {:?}", _err);
3✔
433
                                            }
434
                                        }
435
                                        break;
88✔
436
                                    }
437
                                    Ok(ProcessStatus::Running) => {
438
                                        def2o!("{_d_p} rx_exit ProcessStatus::Running; continue reading");
×
439
                                    }
440
                                    Err(RecvTimeoutError::Timeout) => {
441
                                        def2o!("{_d_p} RecvTimeoutError::Timeout; continue reading");
13✔
442
                                    }
443
                                    Err(RecvTimeoutError::Disconnected) => {
444
                                        def2o!("{_d_p} RecvTimeoutError::Disconnected; break");
36✔
445
                                        break;
36✔
446
                                    }
447
                                }
448
                                // send Continue if no more messages to process by parent thread
449
                                if tx_parent.is_empty() {
13✔
450
                                    def2o!("{_d_p} tx_parent.send(Ok(PipedChunk::Continue))…");
13✔
451
                                    match tx_parent.send(Ok(PipedChunk::Continue)) {
13✔
452
                                        Ok(_) => {
13✔
453
                                            _sends += 1;
13✔
454
                                        }
13✔
455
                                        Err(_err) => {
×
456
                                            def2o!("{_d_p} tx send error: {:?}", _err);
×
457
                                            de_err!("{_d_p} tx send error: {:?}", _err);
×
458
                                        }
459
                                    };
UNCOV
460
                                }
×
461
                            }
462
                            Ok(len) => {
82,418✔
463
                                _recv_bytes += len;
82,418✔
464
                                def2o!("{_d_p} (read #{}) read {} bytes of {} total in this pipe", reads, len, _recv_bytes);
82,418✔
465
                                // is there a chunk delimiter in the buffer?
466

467
                                match chunk_delimiter_opt {
82,418✔
468
                                    Some(chunk_delimiter) => {
79,240✔
469
                                        // look for delimiter
470
                                        // TODO: [2025/12] add benchmark to compare different methods
471
                                        //       of finding a delimiter.
472
                                        //       This `find_memchr` creates a new `Finder<'_>`
473
                                        //       which may not be worth the trouble.
474
                                        let needle = &[chunk_delimiter];
79,240✔
475
                                        let finder = memchr_Finder::new(needle);
79,240✔
476
                                        let mut at: usize = 0;
79,240✔
477
                                        let mut _loop: usize = 0;
79,240✔
478
                                        while at < len {
160,922✔
479
                                            _loop += 1;
83,779✔
480
                                            def2o!("{_d_p} (read #{reads} loop {_loop}) searching for delimiter in buf[{at}..{len}] '{}'", 
83,779✔
481
                                                buffer_to_String_noraw(&buf[at..len]));
83,779✔
482
                                            match finder.find(&buf[at..len]) {
83,779✔
483
                                                Some(pos) => {
8,194✔
484
                                                    // delimiter found at pos
485
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) found delimiter at pos {} (absolute pos {}) among {} returned bytes; buf len {}, buf capacity {}",
8,194✔
486
                                                        pos, at + pos, len, buf.len(), buf.capacity());
8,194✔
487
                                                    debug_assert!(at + pos < buf.len(), "at {} + pos {} >= buf.len {}", at, pos, buf.len());
8,194✔
488
                                                    // send chunks, keep the remainder
489
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(buf[{}..{}])", at, at + pos + 1);
8,194✔
490
                                                    buf_chunk1.extend_from_slice(&buf[at..at + pos + 1]);
8,194✔
491
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: '{}'", buffer_to_String_noraw(&buf_chunk1));
8,194✔
492
                                                    let blen = buf_chunk1.len();
8,194✔
493
                                                    let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
8,194✔
494
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send.extend_from_slice(&buf_chunk1 len {}) (chunk_send capacity {})",
8,194✔
495
                                                        buf_chunk1.len(), chunk_send.capacity());
8,194✔
496
                                                    chunk_send.extend_from_slice(&buf_chunk1);
8,194✔
497
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) chunk_send: '{}'", buffer_to_String_noraw(&chunk_send));
8,194✔
498
                                                    let data_send = PipedChunk::Chunk(chunk_send);
8,194✔
499
                                                    _sends += 1;
8,194✔
500
                                                    match tx_parent.send(Ok(data_send)) {
8,194✔
501
                                                        Ok(_) => {
502
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) sent chunk_send {} bytes, send #{_sends}", blen);
6,097✔
503
                                                        }
504
                                                        Err(_err) => {
2,097✔
505
                                                            def2o!("{_d_p} (read #{reads} loop {_loop}) send error: {:?}", _err);
2,097✔
506
                                                            break;
2,097✔
507
                                                        }
508
                                                    }
509
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.clear()");
6,097✔
510
                                                    buf_chunk1.clear();
6,097✔
511
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1.extend_from_slice(&buf[{}..{}]) (buf len {}, buf capacity {})",
512
                                                    //     at + pos + 1, len, buf.len(), buf.capacity());
513
                                                    // buf_chunk1.extend_from_slice(&buf[at + pos + 1..len]);
514
                                                    // def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
515
                                                    //     buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
516
                                                    delim_found = true;
6,097✔
517
                                                    at += pos + 1;
6,097✔
518
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) {} bytes remaining in buf", len - at);
6,097✔
519
                                                }
520
                                                None => {
521
                                                    // delimiter not found, save buffer and then read child process again
522
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) no delimiter; buf_chunk1.extend_from_slice(&buf[{}..{}]) '{}'",
75,585✔
523
                                                        at, len, buffer_to_String_noraw(&buf[at..len]));
75,585✔
524
                                                    buf_chunk1.extend_from_slice(&buf[at..len]);
75,585✔
525
                                                    def2o!("{_d_p} (read #{reads} loop {_loop}) buf_chunk1: len {}, capacity {}; contents: '{}'",
75,585✔
526
                                                        buf_chunk1.len(), buf_chunk1.capacity(), buffer_to_String_noraw(&buf_chunk1));
75,585✔
527
                                                    delim_found = false;
75,585✔
528
                                                    at += len + 1;
75,585✔
529
                                                }
530
                                            }
531
                                        }
532
                                    }
533
                                    None => {
534
                                        // no delimiter configured, send entire buffer as a chunk
535
                                        let slice_ = &buf[..len];
3,178✔
536
                                        let blen = slice_.len();
3,178✔
537
                                        let mut chunk_send: Bytes = Vec::<u8>::with_capacity(blen);
3,178✔
538
                                        chunk_send.extend_from_slice(slice_);
3,178✔
539
                                        let data_send = PipedChunk::Chunk(chunk_send);
3,178✔
540
                                        delim_found = false;
3,178✔
541
                                        def2o!("{_d_p} (read #{reads}) read {} bytes of {} total; no delimiter configured, send Chunk {} bytes",
3,178✔
542
                                            len, _recv_bytes, blen);
543
                                        _sends += 1;
3,178✔
544
                                        match tx_parent.send(Ok(data_send)) {
3,178✔
545
                                            Ok(_) => {
546
                                                def2o!("{_d_p} (read #{reads}) sent chunk_send {} bytes, send #{_sends}", blen);
3,168✔
547
                                            }
548
                                            Err(_err) => {
10✔
549
                                                def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
10✔
550
                                                break;
10✔
551
                                            }
552
                                        }
553
                                    }
554
                                }
555
                            }
556
                            Err(error) => {
×
557
                                if error.kind() == ErrorKind::Interrupted {
×
558
                                    def2o!("{_d_p} (read #{reads}) read interrupted; retry");
×
559
                                    continue;
×
560
                                }
×
561
                                def2o!("{_d_p} (read #{reads}) read error {}; send Error", error);
×
562
                                delim_found = false;
×
563
                                _sends += 1;
×
564
                                match tx_parent.send(Err(error)) {
×
565
                                    Ok(_) => {}
×
566
                                    Err(_err) => {
×
567
                                        def2o!("{_d_p} (read #{reads}) send error: {:?}", _err);
×
568
                                        break;
×
569
                                    }
570
                                }
571
                            }
572
                        }
573
                    }
574
                    def2x!(
134✔
575
                        "{_d_p} exit, received {} bytes, child process reads {}, parent thread sends {}",
134✔
576
                        _recv_bytes, reads, _sends
577
                    );
578
                });
134✔
579
                match result {
134✔
580
                    Ok(_handle) => {
134✔
581
                        def1o!("{_d_p} spawned thread {:?}", _thread_name2);
134✔
582
                    }
583
                    Err(_err) => {
×
584
                        def1o!("{_d_p} thread spawn error: {}", _err);
×
585
                    }
586
                }
587
                def1x!("{_d_p} return Receiver");
134✔
588

589
                rx_parent
134✔
590
            },
591
            exit_sender: tx_exit,
134✔
592
        }
593
    }
134✔
594
}
595

596
/// `PyRunner` is a struct that represents a Python process instance. It hides
597
/// the complications of starting and communicating with a Python process
598
/// over pipes. It uses `std::process::Child` to start and manage the Python
599
/// process. This handles the complexity of asynchronous inter-process
600
/// communication which is non-trivial to implement decently.
601
///
602
/// _XXX:_ ideally, this would use `PyO3` to communicate with a Python interpreter
603
/// instance. However, [`PyO3::Python::attach`] only creates one
604
/// Python process per Rust process.
605
/// And `PyO3` does not provide a way to create Python subprocesses. So all
606
/// Rust process threads that would use `PyO3` are bottlenecked by this
607
/// one Python process which is of course, in effect, a single-threaded process.
608
/// See [PyO3 Issue #576].
609
/// So instead each `PyRunner` instance creates a new Python process using
610
/// [`std::process::Child`] and communicates over stdout, stderr, and stdin pipes.
611
///
612
/// _XXX:_ I also tried using crate `subprocess` to manage the Python process. However,
613
/// it was not able to handle irregular asynchronous communication.
614
///
615
/// [`PyO3::Python::attach`]: https://docs.rs/pyo3/0.27.1/pyo3/marker/struct.Python.html#method.attach
616
/// [PyO3 Issue #576]: https://github.com/PyO3/pyo3/issues/576
617
pub struct PyRunner {
618
    /// handle to the Python process
619
    //pub process: subprocess::Popen,
620
    pub process: Child,
621
    pipe_stdout: PipeStreamReader,
622
    pipe_stderr: PipeStreamReader,
623
    /// arguments of the process
624
    argv: Vec<String>,
625
    /// path to Python exectuable
626
    pub python_path: FPath,
627
    /// save the `ExitStatus`
628
    exit_status: Option<ExitStatus>,
629
    pipe_stdout_eof: bool,
630
    pipe_stderr_eof: bool,
631
    /// protect against sending repeat exit messages to child pipe threads.
632
    /// only used in `pipes_exit_sender()`
633
    pipe_sent_exit: bool,
634
    /// pipe buffer size in bytes for stdout `PipeStreamReader`
635
    pub pipe_sz_stdout: PipeSz,
636
    /// pipe buffer size in bytes for stderr `PipeStreamReader`
637
    pub pipe_sz_stderr: PipeSz,
638
    /// `Instant` Python process was started.
639
    time_beg: Instant,
640
    /// `Instant` the Python process was first known to be exited.
641
    time_end: Option<Instant>,
642
    /// process ID of the Python process
643
    pid_: u32,
644
    /// this thread ID. For help during debugging.
645
    _tidn: u64,
646
    /// debug message prepend. For help during debugging.
647
    _d_p: String,
648
    /// all stderr is stored in case the process exits with an error
649
    /// this is because stderr may be `read` and only later calls to
650
    /// `poll` or `wait` may find the process has exited.
651
    ///
652
    /// oldest stderr data nearest the front
653
    stderr_all: Option<VecDeque<u8>>,
654
    /// Summary statistic.
655
    /// count of reads performed by pipeline thread reading Python process stdout
656
    pub(crate) count_proc_reads_stdout: Count,
657
    /// Summary statistic.
658
    /// count of reads performed by pipeline thread reading Python process stderr
659
    pub(crate) count_proc_reads_stderr: Count,
660
    /// Summary statistic.
661
    /// count of recv of pipeline thread reading Python process stdout
662
    pub(crate) count_pipe_recv_stdout: Count,
663
    /// Summary statistic.
664
    /// count of recv of pipeline thread reading Python process stderr
665
    pub(crate) count_pipe_recv_stderr: Count,
666
    /// Summary statistic.
667
    /// count of writes to Python process stdin
668
    pub(crate) count_proc_writes: Count,
669
    /// Summary statistic.
670
    /// count of polls of Python process
671
    pub(crate) count_proc_polls: Count,
672
    /// Duration of process waiting
673
    pub(crate) duration_proc_wait: Duration,
674
    /// first seen error
675
    pub(crate) error: Option<Error>,
676
}
677

678
impl std::fmt::Debug for PyRunner {
679
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
680
        f.debug_struct("PyRunner")
×
681
            .field("process", &self.process)
×
682
            .field("argv", &self.argv)
×
683
            .field("python_path", &self.python_path)
×
684
            .field("exit_status", &self.exit_status)
×
685
            .field("pipe_stdout_eof", &self.pipe_stdout_eof)
×
686
            .field("pipe_stderr_eof", &self.pipe_stderr_eof)
×
687
            .field("pipe_sent_exit", &self.pipe_sent_exit)
×
688
            .field("pipe_sz_stdout", &self.pipe_sz_stdout)
×
689
            .field("pipe_sz_stderr", &self.pipe_sz_stderr)
×
690
            .field("time_beg", &self.time_beg)
×
691
            .field("time_end", &self.time_end)
×
692
            .field("pid_", &self.pid_)
×
693
            .field("_tidn", &self._tidn)
×
694
            .finish()
×
695
    }
×
696
}
697

698
impl PyRunner {
699
    /// Create a new `PyRunner` instance.
700
    ///
701
    /// `python_to_use` indicates which Python executable to use.
702
    /// If `PythonToUse::Value` is used then `python_path` must be `Some(FPath)`.
703
    /// Otherwise `python_path` must be `None`.
704
    ///
705
    /// `argv` is the list of arguments to pass to the Python executable.
706
    pub fn new(
67✔
707
        python_to_use: PythonToUse,
67✔
708
        pipe_sz: PipeSz,
67✔
709
        recv_timeout: Duration,
67✔
710
        chunk_delimiter_stdout: Option<ChunkDelimiter>,
67✔
711
        chunk_delimiter_stderr: Option<ChunkDelimiter>,
67✔
712
        python_path: Option<FPath>,
67✔
713
        argv: Vec<&str>
67✔
714
    ) -> Result<Self> {
67✔
715
        def1n!("python_to_use {:?}, python_path {:?}, pipe_sz {:?}, chunk_delimiter_stdout {:?}, chunk_delimiter_stderr {:?}, argv {:?}",
67✔
716
            python_to_use, python_path, pipe_sz, chunk_delimiter_stdout, chunk_delimiter_stderr, argv);
717

718
        let python_path_: &FPath;
719
        // get the Python exectuble
720
        if python_to_use == PythonToUse::Value {
67✔
721
            match &python_path {
49✔
722
                Some(val) => python_path_ = val,
49✔
723
                None => {
724
                    let s = format!("PyRunner::new: python_path must be Some(FPath) when python_to_use is Value");
×
725
                    def1x!("Error InvalidInput {}", s);
×
726
                    return Result::Err(
×
727
                        Error::new(ErrorKind::InvalidInput, s)
×
728
                    );
×
729
                }
730
            }
731
        } else {
732
            debug_assert_none!(python_path, "python_path must be None unless python_to_use is Value");
18✔
733
            python_path_ = match find_python_executable(python_to_use) {
18✔
734
                Some(s) => s,
18✔
735
                None => {
736
                    let s = format!(
×
737
                        "failed to find a Python interpreter; create the Python virtual environment with command --venv, or you may specify the Python interpreter path using environment variable {}; failed",
×
738
                        PYTHON_ENV
739
                    );
740
                    def1x!("{}", s);
×
741
                    return Result::Err(
×
742
                        Error::new(ErrorKind::NotFound, s)
×
743
                    )
×
744
                }
745
            };
746
        }
747
        def1o!("Using Python executable: {:?}", python_path_);
67✔
748

749
        // construct argv_
750
        let mut argv_: Vec<&str> = Vec::with_capacity(argv.len() + 1);
67✔
751
        argv_.push(python_path_.as_str());
67✔
752
        for arg in argv.iter() {
182✔
753
            argv_.push(arg);
182✔
754
        }
182✔
755

756
        summary_stat!(
67✔
757
            // save this python path
758
            match PythonPathsRan.write() {
67✔
759
                Ok(mut set) => {
67✔
760
                    if ! set.contains(python_path_) {
67✔
761
                        set.insert(python_path_.clone());
2✔
762
                    }
65✔
763
                }
764
                Err(err) => {
×
765
                    def1x!("Failed to acquire write lock on PythonPathsRan: {}", err);
×
766
                    return Result::Err(
×
767
                        Error::new(
×
768
                            ErrorKind::Other,
×
769
                            format!("Failed to acquire write lock on PythonPathsRan: {}", err),
×
770
                        )
×
771
                    );
×
772
                }
773
            }
774
        );
775

776
        def1o!("Command::new({:?}).args({:?}).spawn()", python_path_, Vec::from_iter(argv_.iter().skip(1)));
67✔
777
        let time_beg: Instant = Instant::now();
67✔
778
        let result = Command::new(python_path_.as_str())
67✔
779
            .args(argv_.iter().skip(1))
67✔
780
            .stdin(Stdio::piped())
67✔
781
            .stdout(Stdio::piped())
67✔
782
            .stderr(Stdio::piped())
67✔
783
            .spawn();
67✔
784
        let mut process: Child = match result {
67✔
785
            Ok(p) => p,
67✔
786
            Err(err) => {
×
787
                def1x!("Failed to start Python process: {}", err);
×
788
                return Result::Err(
×
789
                    Error::new(
×
790
                        err.kind(),
×
791
                        format!("Python process failed to start: Python path {:?}; {}",
×
792
                            python_path_, err),
×
793
                    )
×
794
                );
×
795
            }
796
        };
797

798
        // TODO: [2025/11] is there a more rustic one-liner to create Vec<String> from Vec<&str>?
799
        let mut argv: Vec<String> = Vec::with_capacity(argv_.len());
67✔
800
        for a in argv_.into_iter() {
249✔
801
            argv.push(String::from(a));
249✔
802
        }
249✔
803

804
        let pid: u32 = process.id();
67✔
805
        def1o!("Python process PID {}", pid);
67✔
806

807
        let _d_p = format!("Python process {}", pid);
67✔
808

809
        let process_stdout = match process.stdout.take() {
67✔
810
            Some(s) => s,
67✔
811
            None => {
812
                let s = format!("{_d_p} stdout was None");
×
813
                def1x!("{}", s);
×
814
                return Result::Err(
×
815
                    Error::other(s)
×
816
                );
×
817
            }
818
        };
819
        let process_stderr = match process.stderr.take() {
67✔
820
            Some(s) => s,
67✔
821
            None => {
822
                let s = format!("{_d_p} stderr was None");
×
823
                def1x!("{}", s);
×
824
                return Result::Err(
×
825
                    Error::other(s)
×
826
                );
×
827
            }
828
        };
829

830
        // create PipeStreamReaders for stdout, stderr
831
        def1o!("{_d_p} PipeStreamReader::new() stdout");
67✔
832
        let pipe_sz_stdout: usize = pipe_sz;
67✔
833
        let pipe_stdout = PipeStreamReader::new(
67✔
834
            String::from("stdout"),
67✔
835
            pid,
67✔
836
            pipe_sz_stdout,
67✔
837
            recv_timeout,
67✔
838
            chunk_delimiter_stdout,
67✔
839
            Box::new(process_stdout)
67✔
840
        );
841
        def1o!("{_d_p} PipeStreamReader::new() stderr");
67✔
842
        // stderr pipe capped at 5096 bytes
843
        let pipe_sz_stderr: usize = std::cmp::min(pipe_sz, 5096);
67✔
844
        let pipe_stderr = PipeStreamReader::new(
67✔
845
            String::from("stderr"),
67✔
846
            pid,
67✔
847
            pipe_sz_stderr,
67✔
848
            recv_timeout,
67✔
849
            chunk_delimiter_stderr,
67✔
850
            Box::new(process_stderr)
67✔
851
        );
852

853
        let _tidn: u64 = threadid_to_u64(thread::current().id());
67✔
854
        defx!("{_d_p} PyRunner created for Python process PID {}, TID {}", pid, _tidn);
67✔
855

856
        Result::Ok(Self {
67✔
857
            process,
67✔
858
            pipe_stdout,
67✔
859
            pipe_stderr,
67✔
860
            argv,
67✔
861
            python_path: python_path_.clone(),
67✔
862
            exit_status: None,
67✔
863
            pipe_stdout_eof: false,
67✔
864
            pipe_stderr_eof: false,
67✔
865
            pipe_sent_exit: false,
67✔
866
            pipe_sz_stdout,
67✔
867
            pipe_sz_stderr,
67✔
868
            pid_: pid,
67✔
869
            _tidn,
67✔
870
            _d_p,
67✔
871
            stderr_all: None,
67✔
872
            time_beg,
67✔
873
            time_end: None,
67✔
874
            count_proc_reads_stdout: 0,
67✔
875
            count_proc_reads_stderr: 0,
67✔
876
            count_pipe_recv_stdout: 0,
67✔
877
            count_pipe_recv_stderr: 0,
67✔
878
            count_proc_writes: 0,
67✔
879
            count_proc_polls: 0,
67✔
880
            duration_proc_wait: Duration::default(),
67✔
881
            error: None,
67✔
882
        })
67✔
883
    }
67✔
884

885
    #[allow(dead_code)]
886
    pub fn pid(&self) -> u32 {
×
887
        self.pid_
×
888
    }
×
889

890
    /// Returns the process exit status.
891
    /// If the process has not exited yet, returns `None`.
892
    pub fn exit_status(&self) -> Option<ExitStatus> {
10✔
893
        self.exit_status
10✔
894
    }
10✔
895

896
    /// Returns `true` if the process exited successfully.
897
    pub fn exit_okay(&self) -> bool {
151✔
898
        self.exit_status == Some(ExitStatus::default())
151✔
899
    }
151✔
900

901
    /// convert a `RecvError` into an `Error`
902
    fn new_error_from_recverror(&self, recverror: &RecvError) -> Error {
×
903
        Error::new(
×
904
            ErrorKind::Other,
×
905
            format!("Python process {} RecvError: {}",
×
906
                self.pid_, recverror),
907
        )
908
    }
×
909

910
    /// Returns all stderr data accumulated so far.
911
    /// oldest stderr data nearest the front
912
    pub fn stderr_all(&mut self) -> Option<&[u8]> {
×
913
        match &mut self.stderr_all {
×
914
            Some(v) => {
×
915
                v.make_contiguous();
×
916
                Some(v.as_slices().0)
×
917
            },
918
            None => None,
×
919
        }
920
    }
×
921

922
    /// Poll the Python process to see if it has exited.
923
    /// If the process has exited, returns `Some(ExitStatus)`.
924
    /// If the process is still running, returns `None`.
925
    pub fn poll(&mut self) -> Option<ExitStatus> {
9,287✔
926
        let _d_p: &String = &self._d_p;
9,287✔
927
        def1n!("{_d_p} poll()");
9,287✔
928

929
        summary_stat!(self.count_proc_polls += 1);
9,287✔
930

931
        match self.process.try_wait() {
9,287✔
932
            Ok(Some(exit_status)) => {
2,639✔
933
                if self.time_end.is_none() {
2,639✔
934
                    self.time_end = Some(Instant::now());
18✔
935
                    debug_assert_none!(self.exit_status, "exit_status should not be set yet");
18✔
936
                }
2,621✔
937
                // XXX: not sure if the subprocess returned ExitStatus would
938
                //      change on later polls, so only set it once
939
                let mut _was_exited = true;
2,639✔
940
                if self.exit_status.is_none() {
2,639✔
941
                    self.exit_status = Some(exit_status);
18✔
942
                    _was_exited = false;
18✔
943
                }
2,621✔
944
                if exit_status.success() {
2,639✔
945
                    def1x!("{_d_p} exited successfully{}", if _was_exited { " was" } else { "" });
2,629✔
946
                } else if let Some(_code) = exit_status.code() {
10✔
947
                    def1x!("{_d_p} exited with code {}{}", _code, if _was_exited { " was" } else { "" });
10✔
948
                } else {
949
                    def1x!("{_d_p} exited with status {:?}", exit_status);
×
950
                }
951
                self.pipes_exit_sender(ProcessStatus::Exited);
2,639✔
952

953
                Some(exit_status)
2,639✔
954
            },
955
            Ok(None) => {
956
                // Process is still alive
957
                def1x!("{_d_p} is still running");
6,648✔
958

959
                None
6,648✔
960
            },
961
            Err(err) => {
×
962
                def1x!("{_d_p} poll error: {}", err);
×
963
                self.error = Some(err);
×
964
                self.pipes_exit_sender(ProcessStatus::Exited);
×
965

966
                None
×
967
            }
968
        }
969
    }
9,287✔
970

971
    /// Accumulate stderr data up to a maximum number of bytes.
972
    // TODO: [2025/12] isn't there a more rustic way to do this?
973
    //       or a crate that does this?
974
    fn stderr_all_add(&mut self, stderr_data: &Bytes) {
4,639✔
975
        const MAX_STDERR_ALL_BYTES: usize = 1024;
976
        match self.stderr_all.as_mut() {
4,639✔
977
            Some(se_prior) => {
4,612✔
978
                // store as much as possible of prior + new stderr data
979
                if se_prior.len() + stderr_data.len() <= MAX_STDERR_ALL_BYTES {
4,612✔
980
                    se_prior.extend(stderr_data.iter());
1,621✔
981
                } else {
1,621✔
982
                    // need to drop oldest prior data from the front
983
                    let mut to_drop: usize = se_prior.len() + stderr_data.len() - MAX_STDERR_ALL_BYTES;
2,991✔
984
                    while to_drop > 0 {
75,629✔
985
                        se_prior.pop_front();
72,638✔
986
                        to_drop -= 1;
72,638✔
987
                    }
72,638✔
988
                    // signify the front data has been cut off
989
                    for b_ in "…".bytes() {
8,973✔
990
                        se_prior.push_front(b_);
8,973✔
991
                    }
8,973✔
992
                    // separate prior data from new data
993
                    se_prior.push_back(b'\n');
2,991✔
994
                    se_prior.push_back(b'\n');
2,991✔
995
                    // append the new data
996
                    se_prior.extend(stderr_data.iter());
2,991✔
997
                }
998
            },
999
            None => {
1000
                let mut v = VecDeque::<u8>::with_capacity(
27✔
1001
                    if stderr_data.len() > MAX_STDERR_ALL_BYTES {
27✔
1002
                        MAX_STDERR_ALL_BYTES
×
1003
                    } else {
1004
                        stderr_data.len()
27✔
1005
                    }
1006
                );
1007
                v.extend(stderr_data.iter());
27✔
1008
                self.stderr_all = Some(v);
27✔
1009
            }
1010
        }
1011
    }
4,639✔
1012

1013
    /// Send exit message to both stdout and stderr pipe threads.
1014
    /// May be called multiple times but only the first call has effect.
1015
    fn pipes_exit_sender(&mut self, pe: ProcessStatus) {
2,720✔
1016
        if self.pipe_sent_exit {
2,720✔
1017
            return;
2,676✔
1018
        }
44✔
1019
        def2ñ!("{} pipes_exit_sender({:?})", self._d_p, pe);
44✔
1020
        self.pipe_stdout.exit_sender.send(pe).unwrap_or(());
44✔
1021
        self.pipe_stderr.exit_sender.send(pe).unwrap_or(());
44✔
1022
        self.pipe_sent_exit = true;
44✔
1023
    }
2,720✔
1024

1025
    /// Write to `input_data` then read from the Python process stdout and
1026
    /// stderr.
1027
    /// Returns (`exited`, `stdout`, `stderr`).
1028
    ///
1029
    /// The stderr_all field accumulates all stderr data read so far. This is to help
1030
    /// when some error occurs in the Python process but the process has not yet exited
1031
    /// and then later calls to `read` find the process has exited. The earlier writes to
1032
    /// stderr are preserved in stderr_all because they often have the crucial error
1033
    /// information e.g. a Python stack trace.
1034
    pub fn write_read(&mut self, input_data: Option<&[u8]>) -> (bool, Option<Bytes>, Option<Bytes>) {
9,287✔
1035
        let _len = input_data.unwrap_or(&[]).len();
9,287✔
1036
        def1n!("{} input_data: {} bytes", self._d_p, _len);
9,287✔
1037

1038
        if let Some(_exit_status) = self.poll() {
9,287✔
1039
            def1o!("{} already exited before read", self._d_p);
2,639✔
1040
        }
6,648✔
1041

1042
        // write string, read from stdout and stderr after poll as there may still be data to read
1043
        // even if the process has exited
1044

1045
        // write to stdin
1046
        if !self.exited() {
9,287✔
1047
            if let Some(input_data_) = input_data {
6,648✔
1048
                if !input_data_.is_empty() {
27✔
1049
                    match self.process.stdin.as_mut() {
27✔
1050
                        Some(stdin) => {
27✔
1051
                            def1o!(
27✔
1052
                                "{} writing {} bytes to stdin (\"{}\")",
27✔
1053
                                self._d_p,
1054
                                input_data_.len(),
27✔
1055
                                buffer_to_String_noraw(&input_data_[..input_data_.len().min(10)]).to_string()
27✔
1056
                            );
1057
                            match stdin.write(input_data_) {
27✔
1058
                                Ok(_len) => {
27✔
1059
                                    summary_stat!(self.count_proc_writes += 1);
27✔
1060
                                    def1o!(
27✔
1061
                                        "{} wrote {} bytes to stdin, expected {} bytes",
27✔
1062
                                        self._d_p, _len, input_data_.len()
27✔
1063
                                    );
1064
                                }
1065
                                Err(_err) => {
×
1066
                                    de_err!("Error writing to Python process {} stdin: {:?}", self.pid_, _err);
×
1067
                                    self.pipes_exit_sender(ProcessStatus::Exited);
×
1068
                                }
×
1069
                            }
1070
                        }
1071
                        None => {
×
1072
                            de_err!("{} stdin is None", self._d_p);
×
1073
                        }
×
1074
                    }
1075
                } else {
1076
                    def1o!("{} no stdin data to write", self._d_p);
×
1077
                }
1078
            }
6,621✔
1079
        } else {
1080
            def1o!("{} has exited; skip writing to stdin", self._d_p);
2,639✔
1081
        }
1082

1083
        let _d_p: &String = &self._d_p;
9,287✔
1084
        // use select to block until either channel signals data is available
1085
        let mut sel = Select::new();
9,287✔
1086
        let mut _sel_counts: usize = 0;
9,287✔
1087
        let sel_out: usize = if !self.pipe_stdout_eof {
9,287✔
1088
            let id = sel.recv(&self.pipe_stdout.chunk_receiver) + 1; // avoid zero id
9,158✔
1089
            def1o!("{_d_p} select recv(&pipe_stdout.chunk_receiver)");
9,158✔
1090
            _sel_counts += 1;
9,158✔
1091
            id
9,158✔
1092
        } else { 0 };
129✔
1093
        let sel_err: usize = if !self.pipe_stderr_eof {
9,287✔
1094
            let id = sel.recv(&self.pipe_stderr.chunk_receiver) + 1; // avoid zero id
6,786✔
1095
            def1o!("{_d_p} select recv(&pipe_stderr.chunk_receiver)");
6,786✔
1096
            _sel_counts += 1;
6,786✔
1097
            id
6,786✔
1098
        } else { 0 };
2,501✔
1099

1100
        if sel_out == 0 && sel_err == 0 {
9,287✔
1101
            def1o!("{_d_p} both stdout and stderr EOF; return");
86✔
1102
            return (self.exited_exhausted(), None, None);
86✔
1103
        }
9,201✔
1104

1105
        def1o!("{_d_p} wait on {} selects…", _sel_counts);
9,201✔
1106
        let d1: Instant = Instant::now();
9,201✔
1107
        let sel_oper = sel.select();
9,201✔
1108
        self.duration_proc_wait += d1.elapsed();
9,201✔
1109
        let sel_index: usize = sel_oper.index() + 1; // avoid zero index
9,201✔
1110
        def1o!("{_d_p} selected {}", sel_index);
9,201✔
1111

1112
        // sanity check `*_stream_eof` is not inconsistent with channel readiness
1113
        if cfg!(any(debug_assertions,test)) {
9,201✔
1114
            if sel_index == sel_out && self.pipe_stdout_eof {
9,201✔
1115
                de_wrn!("pipe_stdout_eof should not be false if sel_out is ready");
×
1116
            }
9,201✔
1117
            if sel_index == sel_err && self.pipe_stderr_eof {
9,201✔
1118
                de_wrn!("pipe_stderr_eof should not be false if sel_err is ready");
×
1119
            }
9,201✔
1120
        }
×
1121

1122
        let mut stdout_data: Option<Bytes> = None;
9,201✔
1123
        let mut stderr_data: Option<Bytes> = None;
9,201✔
1124

1125
        // avoid borrow-checker conflicts
1126
        let _d_p = ();
9,201✔
1127

1128
        match sel_index {
9,201✔
1129
            // TODO: combine these matches since they are nearly identical?
1130
            //       though stdout might be treated differently from stderr?
1131
            //       maybe the messages passed back to main thread should distinguish
1132
            //       between stdout and stderr ?
1133
            i if i == sel_out && sel_out != 0 => {
9,201✔
1134
                // read stdout
1135
                def1o!("{} recv(&pipe_stdout.chunk_receiver)…", self._d_p);
4,511✔
1136
                summary_stat!(self.count_pipe_recv_stdout += 1);
4,511✔
1137
                match sel_oper.recv(&self.pipe_stdout.chunk_receiver) {
4,511✔
1138
                    Ok(remote_result) => {
4,511✔
1139
                        match remote_result {
4,511✔
1140
                            Ok(piped_line) => {
4,511✔
1141
                                match piped_line {
4,511✔
1142
                                    PipedChunk::Chunk(chunk) => {
4,468✔
1143
                                        let len_ = chunk.len();
4,468✔
1144
                                        def1o!("{} received {} bytes from stdout", self._d_p, len_);
4,468✔
1145
                                        stdout_data = Some(Vec::with_capacity(len_ + 1));
4,468✔
1146
                                        let data = stdout_data.as_mut().unwrap();
4,468✔
1147
                                        data.extend_from_slice(chunk.as_slice());
4,468✔
1148
                                    }
1149
                                    PipedChunk::Continue => {
1150
                                        def1o!("{} stdout Continue", self._d_p);
6✔
1151
                                    }
1152
                                    PipedChunk::Done(reads, remaining_bytes) => {
37✔
1153
                                        summary_stat!(self.count_proc_reads_stdout = reads);
37✔
1154
                                        def1o!("{} stdout Done({} reads, {} remaining bytes)",
37✔
1155
                                               self._d_p, reads, remaining_bytes.len());
37✔
1156
                                        if !remaining_bytes.is_empty() {
37✔
1157
                                            stdout_data = Some(Vec::with_capacity(remaining_bytes.len() + 1));
×
1158
                                            let data = stdout_data.as_mut().unwrap();
×
1159
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1160
                                        }
37✔
1161
                                        self.pipe_stdout_eof = true;
37✔
1162
                                        self.pipes_exit_sender(ProcessStatus::Exited);
37✔
1163
                                    }
1164
                                }
1165
                            }
1166
                            Err(error) => {
×
1167
                                de_err!("Error reading from Python process {} stdout: {:?}", self.pid_, error);
×
1168
                                self.error = Some(error);
×
1169
                                self.pipe_stdout_eof = true;
×
1170
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1171
                            }
×
1172
                        }
1173
                    }
1174
                    Err(recverror) => {
×
1175
                        def1o!("{} stdout channel RecvError {}; set pipe_stdout_eof=true", self._d_p, recverror);
×
1176
                        self.error = Some(self.new_error_from_recverror(&recverror));
×
1177
                        self.pipe_stdout_eof = true;
×
1178
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1179
                    }
1180
                }
1181
            }
1182
            i if i == sel_err && sel_err != 0 => {
4,690✔
1183
                // read stderr
1184
                def1o!("{} recv(&pipe_stderr.chunk_receiver)…", self._d_p);
4,690✔
1185
                summary_stat!(self.count_pipe_recv_stderr += 1);
4,690✔
1186
                match sel_oper.recv(&self.pipe_stderr.chunk_receiver) {
4,690✔
1187
                    Ok(remote_result) => {
4,690✔
1188
                        match remote_result {
4,690✔
1189
                            Ok(piped_line) => {
4,690✔
1190
                                match piped_line {
4,690✔
1191
                                    PipedChunk::Chunk(chunk) => {
4,639✔
1192
                                        let len_ = chunk.len();
4,639✔
1193
                                        def1o!("{} received {} bytes from stderr", self._d_p, len_);
4,639✔
1194
                                        let mut data: Bytes = Bytes::with_capacity(len_);
4,639✔
1195
                                        data.extend_from_slice(chunk.as_slice());
4,639✔
1196
                                        self.stderr_all_add(&data);
4,639✔
1197
                                        stderr_data = Some(data);
4,639✔
1198
                                    }
1199
                                    PipedChunk::Continue => {
1200
                                        def1o!("{} stderr Continue", self._d_p);
7✔
1201
                                    }
1202
                                    PipedChunk::Done(reads, remaining_bytes) => {
44✔
1203
                                        summary_stat!(self.count_proc_reads_stderr = reads);
44✔
1204
                                        def1o!("{} stderr Done({} reads, {} remaining bytes)",
44✔
1205
                                               self._d_p, reads, remaining_bytes.len());
44✔
1206
                                        if !remaining_bytes.is_empty() {
44✔
1207
                                            let mut data: Bytes = Bytes::with_capacity(remaining_bytes.len());
×
1208
                                            data.extend_from_slice(remaining_bytes.as_slice());
×
1209
                                            self.stderr_all_add(&data);
×
1210
                                            stderr_data = Some(data);
×
1211
                                        }
44✔
1212
                                        self.pipe_stderr_eof = true;
44✔
1213
                                        self.pipes_exit_sender(ProcessStatus::Exited);
44✔
1214
                                    }
1215
                                }
1216
                            }
1217
                            Err(error) => {
×
1218
                                de_err!("Error reading from Python process {} stderr: {:?}", self.pid_, error);
×
1219
                                self.error = Some(error);
×
1220
                                self.pipe_stderr_eof = true;
×
1221
                                self.pipes_exit_sender(ProcessStatus::Exited);
×
1222
                            }
×
1223
                        }
1224
                    }
1225
                    Err(_err) => {
×
1226
                        def1o!("{} stderr channel RecvError {}; set pipe_stderr_eof=true", self._d_p, _err);
×
1227
                        self.error = Some(self.new_error_from_recverror(&_err));
×
1228
                        self.pipe_stderr_eof = true;
×
1229
                        self.pipes_exit_sender(ProcessStatus::Exited);
×
1230
                    }
1231
                }
1232
            }
1233
            _i => {
×
1234
                def1o!("{} selected unknown index {}", self._d_p, _i);
×
1235
            }
1236
        }
1237

1238
        def1x!("{} return ({}, stdout bytes {:?} (eof? {}), stderr bytes {:?} (eof? {}))",
9,201✔
1239
                self._d_p,
1240
                self.exited_exhausted(),
9,201✔
1241
                stdout_data.as_ref().unwrap_or(&vec![]).len(),
9,201✔
1242
                self.pipe_stdout_eof,
1243
                stderr_data.as_ref().unwrap_or(&vec![]).len(),
9,201✔
1244
                self.pipe_stderr_eof
1245
        );
1246

1247
        (self.exited_exhausted(), stdout_data, stderr_data)
9,201✔
1248
    }
9,287✔
1249

1250
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`?
1251
    pub fn exited(&self) -> bool {
9,333✔
1252
        self.exit_status.is_some()
9,333✔
1253
    }
9,333✔
1254

1255
    /// Has a `subprocess::poll` or `subprocess::wait` already returned an `ExitStatus`
1256
    /// *and* have both stdout and stderr streams reached EOF?
1257
    pub fn exited_exhausted(&self) -> bool {
18,629✔
1258
        self.exit_status.is_some() && self.pipe_stdout_eof && self.pipe_stderr_eof
18,629✔
1259
    }
18,629✔
1260

1261
    /// Wait for the Python process to exit.
1262
    /// If the process has already exited then return the saved `ExitStatus`.
1263
    pub fn wait(&mut self) -> Result<ExitStatus> {
×
1264
        let _d_p: &String = &self._d_p;
×
1265
        if self.exited() {
×
1266
            def1ñ!("{_d_p} exited; return {:?}",
×
1267
                   self.exit_status.unwrap());
×
1268
            return Ok(self.exit_status.unwrap());
×
1269
        }
×
1270
        // XXX: should `wait` be passed a timeout?
1271
        def1n!("{_d_p} wait()");
×
1272
        let d1: Instant = Instant::now();
×
1273
        let rc = self.process.wait();
×
1274
        self.duration_proc_wait += d1.elapsed();
×
1275
        match rc {
×
1276
            Ok(exit_status) => {
×
1277
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
×
1278
                self.time_end = Some(Instant::now());
×
1279
                // prefer the first saved `ExitStatus`
1280
                // however setting `self.exit_status` again is never expected to happen
1281
                if self.exit_status.is_none() {
×
1282
                    self.exit_status = Some(exit_status);
×
1283
                } else {
×
1284
                    debug_panic!("Python process {} exit_status is already set! {:?}",
×
1285
                                 self.pid_, self.exit_status)
1286
                }
1287
                def1x!("{_d_p} wait returned {:?}", exit_status);
×
1288
                return Ok(self.exit_status.unwrap());
×
1289
            }
1290
            Err(error) => {
×
1291
                de_err!("{_d_p} wait returned {:?}", error);
×
1292
                def1x!("{_d_p} error wait returned {:?}", error);
×
1293
                return Result::Err(
×
1294
                    Error::new(
×
1295
                        error.kind(),
×
1296
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1297
                    )
×
1298
                );
×
1299
            }
1300
        }
1301
    }
×
1302

1303
    /// Total run duration of the process; imprecise as the end time is merely the first `Instant`
1304
    /// a `subprocess::poll` or `subprocess::wait` returned an `ExitStatus`.
1305
    /// Precise enough for most needs.
1306
    ///
1307
    /// Returns `None` if the process is not yet known to have exited.
1308
    pub fn duration(&self) -> Option<Duration> {
26✔
1309
        self.time_end?;
26✔
1310
        match self.time_end {
26✔
1311
            Some(time_end) => Some(time_end - self.time_beg),
26✔
1312
            None => {
1313
                debug_panic!("Python process {} is exited but time_end is None", self.pid_);
×
1314

1315
                None
×
1316
            }
1317
        }
1318
    }
26✔
1319

1320
    /// Read from the `PyRunner`, print the output, and wait for it to finish.
1321
    /// Do not call `read` or `wait` before calling this function.
1322
    /// Helper for simple Python commands that do not require interaction.
1323
    ///
1324
    /// This is not expected to be run as part of normal operation of
1325
    /// `s4`. This is for special operations such as `s4 --venv`. It prints
1326
    /// to stdout. In normal operation of `s4`, only the main
1327
    /// thread should print to stdout.
1328
    pub fn run(&mut self, print_argv: bool, print_stdout: bool, print_stderr: bool) -> Result<(Bytes, Bytes)> {
26✔
1329
        let _d_p: &String = &self._d_p;
26✔
1330
        def1n!("{_d_p}, print_argv={}", print_argv);
26✔
1331

1332
        if self.exited() {
26✔
1333
            debug_panic!("{_d_p} already exited!");
×
1334
            return Result::Ok((Bytes::with_capacity(0), Bytes::with_capacity(0)));
×
1335
        }
26✔
1336

1337
        if print_argv {
26✔
1338
            // print the command executed
1339

1340
            // get the prompt, prefer to use PS4 env var
1341
            let prompt = match env::var("PS4") {
10✔
1342
                Ok(s) => {
×
1343
                    if s.is_empty() {
×
1344
                        String::from(PROMPT_DEFAULT)
×
1345
                    } else {
1346
                        s
×
1347
                    }
1348
                },
1349
                Err(_) => String::from(PROMPT_DEFAULT),
10✔
1350
            };
1351
            // print the command, escaping each argument
1352
            let mut lock = stdout().lock();
10✔
1353
            // TODO: [2025/11/17] handle write returning an error?
1354
            let _ = lock.write(prompt.as_bytes());
10✔
1355
            for arg in self.argv.iter() {
51✔
1356
                let es = escape(arg.into());
51✔
1357
                let _ = lock.write(es.as_bytes());
51✔
1358
                let _ = lock.write(b" ");
51✔
1359
            }
51✔
1360
            let _ = lock.write(b"\n");
10✔
1361
            let _ = lock.flush();
10✔
1362
        }
16✔
1363

1364
        let rc = self.process.wait();
26✔
1365
        def1o!("{_d_p} wait() returned {:?}", rc);
26✔
1366
        match rc {
26✔
1367
            Ok(exit_status) => {
26✔
1368
                debug_assert_none!(self.time_end, "time_end should not be set since exited() was false");
26✔
1369
                self.time_end = Some(Instant::now());
26✔
1370
                // prefer the first saved `ExitStatus`
1371
                // however setting `self.exit_status` again is never expected to happen
1372
                if self.exit_status.is_none() {
26✔
1373
                    self.exit_status = Some(exit_status);
26✔
1374
                } else {
26✔
1375
                    debug_panic!("{_d_p} exit_status is already set! {:?}",
×
1376
                                 self.exit_status)
1377
                }
1378
            }
1379
            Err(error) => {
×
1380
                de_err!("{_d_p} wait returned {:?}", error);
×
1381
                def1x!("{_d_p} error wait returned {:?}", error);
×
1382
                return Result::Err(
×
1383
                    Error::new(
×
1384
                        error.kind(),
×
1385
                        format!("Python process {} wait() failed: {}", self.pid_, error),
×
1386
                    )
×
1387
                );
×
1388
            }
1389
        }
1390

1391
        let mut stdout_data: Bytes = Bytes::with_capacity(2056);
26✔
1392
        let mut stderr_data: Bytes = Bytes::with_capacity(1024);
26✔
1393

1394
        // remove _d_p reference to avoid borrow checker conflict that would occur in the loop
1395
        let _d_p = ();
26✔
1396

1397
        // print remaining stdout and stderr
1398
        loop {
1399
            let (
1400
                _exited,
1,987✔
1401
                out_data,
1,987✔
1402
                err_data,
1,987✔
1403
            ) = self.write_read(None);
1,987✔
1404
            def1o!("{} exited? {:?}", self._d_p, _exited);
1,987✔
1405
            // print stdout to stdout
1406
            if let Some(data) = out_data {
1,987✔
1407
                stdout_data.extend_from_slice(data.as_slice());
1,882✔
1408
                if ! data.is_empty() && print_stdout {
1,882✔
1409
                    let mut lock = stdout().lock();
1,858✔
1410
                    let _ = lock.write(data.as_slice());
1,858✔
1411
                    let _ = lock.flush();
1,858✔
1412
                }
1,858✔
1413
            }
105✔
1414
            // print stderr to stderr
1415
            if let Some(data) = err_data {
1,987✔
1416
                stderr_data.extend_from_slice(data.as_slice());
53✔
1417
                if ! data.is_empty() && print_stderr {
53✔
1418
                    let mut lock = stderr().lock();
27✔
1419
                    let _ = lock.write(data.as_slice());
27✔
1420
                    let _ = lock.flush();
27✔
1421
                }
27✔
1422
            }
1,934✔
1423
            if _exited {
1,987✔
1424
                break;
26✔
1425
            }
1,961✔
1426
        }
1427

1428
        let _d_p: &String = &self._d_p;
26✔
1429

1430
        match self.exit_status {
26✔
1431
            Some(status) => {
26✔
1432
                if ! status.success() {
26✔
1433
                    let s = format!("Python process {} exited with non-zero status {:?}", self.pid_, status);
×
1434
                    def1x!("{_d_p} {}", s);
×
1435
                    return Result::Err(
×
1436
                        Error::other(s)
×
1437
                    )
×
1438
                }
26✔
1439
            }
1440
            None => {
1441
                debug_panic!("{_d_p} exit_status is None after wait()");
×
1442
            }
1443
        }
1444

1445
        if print_argv {
26✔
1446
            let mut lock = stdout().lock();
10✔
1447
            let _ = lock.write(b"\n");
10✔
1448
            let _ = lock.flush();
10✔
1449
        }
16✔
1450

1451
        def1x!("{_d_p} duration {:?}, return Ok(stdout {} bytes, stderr {} bytes)",
26✔
1452
            self.duration(), stdout_data.len(), stderr_data.len());
26✔
1453

1454
        Result::Ok((stdout_data, stderr_data))
26✔
1455
    }
26✔
1456

1457
    /// Create a `PyRunner`, run it, return Ok or Err.
1458
    ///
1459
    /// This calls `PyRunner::new()` and then `PyRunner::run()`.
1460
    /// See `run()` regarding its intended use.
1461
    pub fn run_once(
17✔
1462
        python_to_use: PythonToUse,
17✔
1463
        pipe_sz: PipeSz,
17✔
1464
        recv_timeout: Duration,
17✔
1465
        chunk_delimiter: ChunkDelimiter,
17✔
1466
        python_path: Option<FPath>,
17✔
1467
        argv: Vec<&str>,
17✔
1468
        print_argv: bool
17✔
1469
    ) -> Result<(PyRunner, Bytes, Bytes)> {
17✔
1470
        def1ñ!("({:?}, {:?}, {:?})", python_to_use, python_path, argv);
17✔
1471
        let mut pyrunner = match PyRunner::new(
17✔
1472
            python_to_use,
17✔
1473
            pipe_sz,
17✔
1474
            recv_timeout,
17✔
1475
            Some(chunk_delimiter),
17✔
1476
            None,
17✔
1477
            python_path,
17✔
1478
            argv,
17✔
1479
        ) {
17✔
1480
            Ok(pyrunner) => pyrunner,
17✔
1481
            Err(err) => {
×
1482
                def1x!("PyRunner::new failed {:?}", err);
×
1483
                return Result::Err(err);
×
1484
            }
1485
        };
1486

1487
        match pyrunner.run(print_argv, true, true) {
17✔
1488
            Ok((stdout_data, stderr_data)) => {
17✔
1489
                def1x!("PyRunner::run Ok");
17✔
1490
                return Result::Ok((pyrunner, stdout_data, stderr_data));
17✔
1491
            }
1492
            Err(err) => {
×
1493
                def1x!("PyRunner::run Error {:?}", err);
×
1494
                return Result::Err(err);
×
1495
            }
1496
        }
1497
    }
17✔
1498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc