• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

lpenz / ogle / 21967796932

12 Feb 2026 10:59PM UTC coverage: 60.289% (+1.9%) from 58.407%
21967796932

push

github

lpenz
Cargo.*: increment version to 2.3.3

501 of 831 relevant lines covered (60.29%)

1.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.2
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions.
6
//!
7
//! # `Cmd`
8
//!
9
//! The [`Cmd`] type has an inner `Vec<String>` that we can turn into
10
//! a [`tokio::process::Command`]. It implements `Clone`, which we use
11
//! to spawn the same process multiple times.
12
//!
13
//! # `ProcessStream`
14
//!
15
//! The [`ProcessStream`] type wraps [`tokio_process_stream`] in order
16
//! to provide an [`Item`] that implements `Eq` which we can then use
17
//! for testing.
18

19
use color_eyre::Result;
20
use nix::sys::signal::Signal;
21
use std::collections::VecDeque;
22
use std::fmt;
23
use std::io;
24
use std::os::unix::process::ExitStatusExt;
25
use std::pin::Pin;
26
use std::process::ExitStatus;
27
use std::process::Stdio;
28
use std::task::{Context, Poll};
29
use tokio::process::Child;
30
use tokio::process::Command;
31
use tokio_process_stream as tps;
32
use tokio_stream::Stream;
33
use tracing::instrument;
34

35
// Command wrapper ///////////////////////////////////////////////////
36

37
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
38
#[derive(Debug, Default, Clone)]
39
pub struct Cmd(Vec<String>);
40

41
impl From<&Cmd> for Command {
42
    fn from(cmd: &Cmd) -> Command {
8✔
43
        let mut command = Command::new(&cmd.0[0]);
8✔
44
        command.args(cmd.0.iter().skip(1));
8✔
45
        command.stdin(Stdio::null());
8✔
46
        command.stdout(Stdio::piped());
8✔
47
        command.stderr(Stdio::piped());
8✔
48
        command
8✔
49
    }
8✔
50
}
51

52
impl From<Vec<String>> for Cmd {
53
    fn from(s: Vec<String>) -> Cmd {
2✔
54
        Self(s)
2✔
55
    }
2✔
56
}
57

58
impl From<&[&str]> for Cmd {
59
    fn from(s: &[&str]) -> Cmd {
6✔
60
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
11✔
61
    }
6✔
62
}
63

64
impl fmt::Display for Cmd {
65
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
66
        let joined = self.0.join(" ");
1✔
67
        write!(f, "{joined}")
1✔
68
    }
1✔
69
}
70

71
// Exit status ///////////////////////////////////////////////////////
72

73
/// A [`std::process::ExitStatus`] pseudo-wrapper that impl Clone
74
/// and custom Display.
75
#[derive(Debug, Default, Clone, PartialEq, Eq)]
76
pub enum ExitSts {
77
    #[default]
78
    Success,
79
    Code(u8),
80
    Signal(i32),
81
}
82

83
impl ExitSts {
84
    pub fn success(&self) -> bool {
5✔
85
        self == &ExitSts::Success
5✔
86
    }
5✔
87
}
88

89
impl From<ExitStatus> for ExitSts {
90
    fn from(sts: ExitStatus) -> ExitSts {
6✔
91
        if sts.success() {
6✔
92
            ExitSts::Success
4✔
93
        } else if let Some(code) = sts.code() {
2✔
94
            ExitSts::Code(code as u8)
1✔
95
        } else if let Some(signal) = sts.signal() {
1✔
96
            ExitSts::Signal(signal)
1✔
97
        } else {
98
            panic!("Unable to figure out exit status {sts:?}")
×
99
        }
100
    }
6✔
101
}
102

103
impl fmt::Display for ExitSts {
104
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4✔
105
        match self {
4✔
106
            ExitSts::Success => write!(f, "success"),
1✔
107
            ExitSts::Code(code) => write!(f, "code {code}"),
1✔
108
            ExitSts::Signal(signal) => {
2✔
109
                if let Ok(s) = Signal::try_from(*signal) {
2✔
110
                    write!(f, "signal {signal} ({s})")
1✔
111
                } else {
112
                    write!(f, "signal {signal}")
1✔
113
                }
114
            }
115
        }
116
    }
4✔
117
}
118

119
// ProcessStream /////////////////////////////////////////////////////
120

121
/// A clonable, Eq replacement for [`tokio_process_stream::Item`]
122
#[derive(Debug, Clone, PartialEq, Eq)]
123
pub enum Item {
124
    /// A stdout line printed by the process.
125
    Stdout(String),
126
    /// A stderr line printed by the process.
127
    Stderr(String),
128
    /// The [`ExitSts`], yielded after the process exits.
129
    Done(Result<ExitSts, io::ErrorKind>),
130
}
131

132
impl From<tps::Item<String>> for Item {
133
    fn from(item: tps::Item<String>) -> Self {
8✔
134
        match item {
8✔
135
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
136
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
137
            tps::Item::Done(result) => Item::Done(match result {
6✔
138
                Ok(sts) => Ok(sts.into()),
6✔
139
                Err(err) => Err(err.kind()),
×
140
            }),
141
        }
142
    }
8✔
143
}
144

145
/// A wrapper for [`tokio_process_stream::ProcessLineStream`].
146
///
147
/// Also provides a virtual implementation for use in tests.
148
pub enum ProcessStream {
149
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
150
    Real { stream: Box<tps::ProcessLineStream> },
151
    /// Mock for a running process stream that just returns items from
152
    /// a list. Useful for testing.
153
    Virtual { items: VecDeque<Item> },
154
}
155

156
impl ProcessStream {
157
    /// Return a mutable reference to the child object
158
    pub fn child_mut(&mut self) -> Option<&mut Child> {
1✔
159
        if let ProcessStream::Real { stream } = self {
1✔
160
            stream.child_mut()
1✔
161
        } else {
162
            None
×
163
        }
164
    }
1✔
165
}
166

167
impl std::fmt::Debug for ProcessStream {
168
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
169
        match self {
1✔
170
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
171
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
172
        }
173
        .finish()
1✔
174
    }
1✔
175
}
176

177
impl From<tps::ProcessLineStream> for ProcessStream {
178
    fn from(stream: tps::ProcessLineStream) -> Self {
6✔
179
        ProcessStream::Real {
6✔
180
            stream: Box::new(stream),
6✔
181
        }
6✔
182
    }
6✔
183
}
184

185
impl From<VecDeque<Item>> for ProcessStream {
186
    fn from(items: VecDeque<Item>) -> Self {
3✔
187
        ProcessStream::Virtual { items }
3✔
188
    }
3✔
189
}
190

191
impl Stream for ProcessStream {
192
    type Item = Item;
193

194
    #[instrument(level = "debug", ret, skip(cx))]
195
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32✔
196
        let this = self.get_mut();
197
        match this {
198
            ProcessStream::Real { stream } => {
199
                let next = Pin::new(stream).poll_next(cx);
200
                match next {
201
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
8✔
202
                    Poll::Pending => Poll::Pending,
203
                }
204
            }
205
            ProcessStream::Virtual { items } => Poll::Ready(items.pop_front()),
206
        }
207
    }
32✔
208
}
209

210
// Tests /////////////////////////////////////////////////////////////
211

212
#[cfg(test)]
213
pub mod test {
214
    use color_eyre::Result;
215
    use color_eyre::eyre::eyre;
216
    use tokio_stream::StreamExt;
217

218
    use super::*;
219

220
    async fn stream_cmd(cmdstr: &[&str]) -> Result<ProcessStream> {
5✔
221
        let cmd = Cmd::from(cmdstr);
5✔
222
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
5✔
223
        Ok(ProcessStream::from(process_stream))
5✔
224
    }
5✔
225

226
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
7✔
227
    where
7✔
228
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
7✔
229
    {
7✔
230
        stream.next().await.ok_or(eyre!("no item received"))
7✔
231
    }
7✔
232

233
    async fn assert_closed<T>(stream: &mut T)
5✔
234
    where
5✔
235
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
5✔
236
    {
5✔
237
        assert_eq!(stream.next().await, None);
5✔
238
    }
5✔
239

240
    #[tokio::test]
241
    async fn test_true() -> Result<()> {
1✔
242
        let mut stream = stream_cmd(&["true"]).await?;
1✔
243
        let item = stream_next(&mut stream).await?;
1✔
244
        let Item::Done(sts) = item else {
1✔
245
            return Err(eyre!("unexpected stream item {:?}", item));
×
246
        };
247
        assert!(sts.unwrap().success());
1✔
248
        assert!(stream.next().await.is_none());
1✔
249
        assert_closed(&mut stream).await;
1✔
250
        Ok(())
2✔
251
    }
1✔
252

253
    #[tokio::test]
254
    async fn test_false() -> Result<()> {
1✔
255
        let mut stream = stream_cmd(&["false"]).await?;
1✔
256
        let item = stream_next(&mut stream).await?;
1✔
257
        let Item::Done(sts) = item else {
1✔
258
            return Err(eyre!("unexpected stream item {:?}", item));
×
259
        };
260
        assert!(!sts.unwrap().success());
1✔
261
        assert_closed(&mut stream).await;
1✔
262
        Ok(())
2✔
263
    }
1✔
264

265
    #[tokio::test]
266
    async fn test_echo() -> Result<()> {
1✔
267
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
268
        let item = stream_next(&mut stream).await?;
1✔
269
        let Item::Stdout(s) = item else {
1✔
270
            return Err(eyre!("unexpected stream item {:?}", item));
×
271
        };
272
        assert_eq!(s, "test");
1✔
273
        let item = stream_next(&mut stream).await?;
1✔
274
        let Item::Done(sts) = item else {
1✔
275
            return Err(eyre!("unexpected stream item {:?}", item));
×
276
        };
277
        assert!(sts.unwrap().success());
1✔
278
        assert_closed(&mut stream).await;
1✔
279
        Ok(())
2✔
280
    }
1✔
281

282
    #[tokio::test]
283
    async fn test_stderr() -> Result<()> {
1✔
284
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
285
        let item = stream_next(&mut stream).await?;
1✔
286
        let Item::Stderr(s) = item else {
1✔
287
            return Err(eyre!("unexpected stream item {:?}", item));
×
288
        };
289
        assert_eq!(s, "test");
1✔
290
        let item = stream_next(&mut stream).await?;
1✔
291
        let Item::Done(sts) = item else {
1✔
292
            return Err(eyre!("unexpected stream item {:?}", item));
×
293
        };
294
        assert!(sts.unwrap().success());
1✔
295
        assert_closed(&mut stream).await;
1✔
296
        Ok(())
2✔
297
    }
1✔
298

299
    #[tokio::test]
300
    async fn test_kill() -> Result<()> {
1✔
301
        let mut stream = stream_cmd(&["/bin/sh", "-c", "sleep 9999999"]).await?;
1✔
302
        if let Some(child) = stream.child_mut() {
1✔
303
            let _ = child.start_kill();
1✔
304
        }
1✔
305
        let item = stream_next(&mut stream).await?;
1✔
306
        assert_eq!(item, Item::Done(Ok(ExitSts::Signal(9))));
1✔
307
        assert_closed(&mut stream).await;
1✔
308
        Ok(())
2✔
309
    }
1✔
310

311
    #[test]
312
    fn test_exitsts_display() {
1✔
313
        assert_eq!(format!("{}", ExitSts::Success), "success");
1✔
314
        assert_eq!(format!("{}", ExitSts::Code(42)), "code 42");
1✔
315
        assert_eq!(format!("{}", ExitSts::Signal(1)), "signal 1 (SIGHUP)");
1✔
316
        assert_eq!(format!("{}", ExitSts::Signal(12345)), "signal 12345");
1✔
317
    }
1✔
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc