• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 21724486044

05 Feb 2026 06:57PM UTC coverage: 58.407% (-0.2%) from 58.629%
21724486044

push

github

lpenz
Cargo.*: increment version to 2.3.3

462 of 791 relevant lines covered (58.41%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.67
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions.
6
//!
7
//! # `Cmd`
8
//!
9
//! The [`Cmd`] type has an inner `Vec<String>` that we can turn into
10
//! a [`tokio::process::Command`]. It implements `Clone`, which we use
11
//! to spawn the same process multiple times.
12
//!
13
//! # `ProcessStream`
14
//!
15
//! The [`ProcessStream`] type wraps [`tokio_process_stream`] in order
16
//! to provide an [`Item`] that implements `Eq` which we can then use
17
//! for testing.
18

19
use color_eyre::Result;
20
use nix::sys::signal::Signal;
21
use std::collections::VecDeque;
22
use std::fmt;
23
use std::io;
24
use std::os::unix::process::ExitStatusExt;
25
use std::pin::Pin;
26
use std::process::ExitStatus;
27
use std::process::Stdio;
28
use std::task::{Context, Poll};
29
use tokio::process::Child;
30
use tokio::process::Command;
31
use tokio_process_stream as tps;
32
use tokio_stream::Stream;
33
use tracing::instrument;
34

35
// Command wrapper ///////////////////////////////////////////////////
36

37
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
38
#[derive(Debug, Default, Clone)]
39
pub struct Cmd(Vec<String>);
40

41
impl From<&Cmd> for Command {
42
    fn from(cmd: &Cmd) -> Command {
7✔
43
        let mut command = Command::new(&cmd.0[0]);
7✔
44
        command.args(cmd.0.iter().skip(1));
7✔
45
        command.stdin(Stdio::null());
7✔
46
        command.stdout(Stdio::piped());
7✔
47
        command.stderr(Stdio::piped());
7✔
48
        command
7✔
49
    }
7✔
50
}
51

52
impl From<Vec<String>> for Cmd {
53
    fn from(s: Vec<String>) -> Cmd {
2✔
54
        Self(s)
2✔
55
    }
2✔
56
}
57

58
impl From<&[&str]> for Cmd {
59
    fn from(s: &[&str]) -> Cmd {
5✔
60
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
8✔
61
    }
5✔
62
}
63

64
impl fmt::Display for Cmd {
65
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
66
        let joined = self.0.join(" ");
1✔
67
        write!(f, "{joined}")
1✔
68
    }
1✔
69
}
70

71
// Exit status ///////////////////////////////////////////////////////
72

73
/// A [`std::process::ExitStatus`] pseudo-wrapper that impl Clone
74
/// and custom Display.
75
#[derive(Debug, Default, Clone, PartialEq, Eq)]
76
pub enum ExitSts {
77
    #[default]
78
    Success,
79
    Code(u8),
80
    Signal(i32),
81
}
82

83
impl ExitSts {
84
    pub fn success(&self) -> bool {
5✔
85
        self == &ExitSts::Success
5✔
86
    }
5✔
87
}
88

89
impl From<ExitStatus> for ExitSts {
90
    fn from(sts: ExitStatus) -> ExitSts {
5✔
91
        if sts.success() {
5✔
92
            ExitSts::Success
4✔
93
        } else if let Some(code) = sts.code() {
1✔
94
            ExitSts::Code(code as u8)
1✔
95
        } else if let Some(signal) = sts.signal() {
×
96
            ExitSts::Signal(signal)
×
97
        } else {
98
            panic!("Unable to figure out exit status {sts:?}")
×
99
        }
100
    }
5✔
101
}
102

103
impl fmt::Display for ExitSts {
104
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
105
        match self {
×
106
            ExitSts::Success => write!(f, "success"),
×
107
            ExitSts::Code(code) => write!(f, "code {code}"),
×
108
            ExitSts::Signal(signal) => {
×
109
                if let Ok(s) = Signal::try_from(*signal) {
×
110
                    write!(f, "signal {signal} ({s})")
×
111
                } else {
112
                    write!(f, "signal {signal}")
×
113
                }
114
            }
115
        }
116
    }
×
117
}
118

119
// ProcessStream /////////////////////////////////////////////////////
120

121
/// A clonable, Eq replacement for [`tokio_process_stream::Item`]
122
#[derive(Debug, Clone, PartialEq, Eq)]
123
pub enum Item {
124
    /// A stdout line printed by the process.
125
    Stdout(String),
126
    /// A stderr line printed by the process.
127
    Stderr(String),
128
    /// The [`ExitSts`], yielded after the process exits.
129
    Done(Result<ExitSts, io::ErrorKind>),
130
}
131

132
impl From<tps::Item<String>> for Item {
133
    fn from(item: tps::Item<String>) -> Self {
7✔
134
        match item {
7✔
135
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
136
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
137
            tps::Item::Done(result) => Item::Done(match result {
5✔
138
                Ok(sts) => Ok(sts.into()),
5✔
139
                Err(err) => Err(err.kind()),
×
140
            }),
141
        }
142
    }
7✔
143
}
144

145
/// A wrapper for [`tokio_process_stream::ProcessLineStream`].
146
///
147
/// Also provides a virtual implementation for use in tests.
148
pub enum ProcessStream {
149
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
150
    Real { stream: Box<tps::ProcessLineStream> },
151
    /// Mock for a running process stream that just returns items from
152
    /// a list. Useful for testing.
153
    Virtual { items: VecDeque<Item> },
154
}
155

156
impl ProcessStream {
157
    /// Return a mutable reference to the child object
158
    pub fn child_mut(&mut self) -> Option<&mut Child> {
×
159
        if let ProcessStream::Real { stream } = self {
×
160
            stream.child_mut()
×
161
        } else {
162
            None
×
163
        }
164
    }
×
165
}
166

167
impl std::fmt::Debug for ProcessStream {
168
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
169
        match self {
1✔
170
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
171
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
172
        }
173
        .finish()
1✔
174
    }
1✔
175
}
176

177
impl From<tps::ProcessLineStream> for ProcessStream {
178
    fn from(stream: tps::ProcessLineStream) -> Self {
5✔
179
        ProcessStream::Real {
5✔
180
            stream: Box::new(stream),
5✔
181
        }
5✔
182
    }
5✔
183
}
184

185
impl From<VecDeque<Item>> for ProcessStream {
186
    fn from(items: VecDeque<Item>) -> Self {
3✔
187
        ProcessStream::Virtual { items }
3✔
188
    }
3✔
189
}
190

191
impl Stream for ProcessStream {
192
    type Item = Item;
193

194
    #[instrument(level = "debug", ret, skip(cx))]
195
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
23✔
196
        let this = self.get_mut();
197
        match this {
198
            ProcessStream::Real { stream } => {
199
                let next = Pin::new(stream).poll_next(cx);
200
                match next {
201
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
7✔
202
                    Poll::Pending => Poll::Pending,
203
                }
204
            }
205
            ProcessStream::Virtual { items } => Poll::Ready(items.pop_front()),
206
        }
207
    }
23✔
208
}
209

210
// Tests /////////////////////////////////////////////////////////////
211

212
#[cfg(test)]
213
pub mod test {
214
    use color_eyre::Result;
215
    use color_eyre::eyre::eyre;
216
    use tokio_stream::StreamExt;
217

218
    use super::*;
219

220
    async fn stream_cmd(
4✔
221
        cmdstr: &[&str],
4✔
222
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
223
        let cmd = Cmd::from(cmdstr);
4✔
224
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
4✔
225
        Ok(ProcessStream::from(process_stream))
4✔
226
    }
4✔
227

228
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
229
    where
6✔
230
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
231
    {
6✔
232
        stream.next().await.ok_or(eyre!("no item received"))
6✔
233
    }
6✔
234

235
    #[tokio::test]
236
    async fn test_true() -> Result<()> {
1✔
237
        let mut stream = stream_cmd(&["true"]).await?;
1✔
238
        let item = stream_next(&mut stream).await?;
1✔
239
        let Item::Done(sts) = item else {
1✔
240
            return Err(eyre!("unexpected stream item {:?}", item));
×
241
        };
242
        assert!(sts.unwrap().success());
1✔
243
        assert!(stream.next().await.is_none());
1✔
244
        Ok(())
2✔
245
    }
1✔
246

247
    #[tokio::test]
248
    async fn test_false() -> Result<()> {
1✔
249
        let mut stream = stream_cmd(&["false"]).await?;
1✔
250
        let item = stream_next(&mut stream).await?;
1✔
251
        let Item::Done(sts) = item else {
1✔
252
            return Err(eyre!("unexpected stream item {:?}", item));
×
253
        };
254
        assert!(!sts.unwrap().success());
1✔
255
        Ok(())
2✔
256
    }
1✔
257

258
    #[tokio::test]
259
    async fn test_echo() -> Result<()> {
1✔
260
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
261
        let item = stream_next(&mut stream).await?;
1✔
262
        let Item::Stdout(s) = item else {
1✔
263
            return Err(eyre!("unexpected stream item {:?}", item));
×
264
        };
265
        assert_eq!(s, "test");
1✔
266
        let item = stream_next(&mut stream).await?;
1✔
267
        let Item::Done(sts) = item else {
1✔
268
            return Err(eyre!("unexpected stream item {:?}", item));
×
269
        };
270
        assert!(sts.unwrap().success());
1✔
271
        Ok(())
2✔
272
    }
1✔
273

274
    #[tokio::test]
275
    async fn test_stderr() -> Result<()> {
1✔
276
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
277
        let item = stream_next(&mut stream).await?;
1✔
278
        let Item::Stderr(s) = item else {
1✔
279
            return Err(eyre!("unexpected stream item {:?}", item));
×
280
        };
281
        assert_eq!(s, "test");
1✔
282
        let item = stream_next(&mut stream).await?;
1✔
283
        let Item::Done(sts) = item else {
1✔
284
            return Err(eyre!("unexpected stream item {:?}", item));
×
285
        };
286
        assert!(sts.unwrap().success());
1✔
287
        Ok(())
2✔
288
    }
1✔
289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc