• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

lpenz / ogle / 16103139788

06 Jul 2025 09:03PM UTC coverage: 63.678% (+18.2%) from 45.475%
16103139788

push

github

lpenz
Complete refactoring using layers that should be easier to test

Layers connected via streams, which we can mock and test.
This combines a bunch of commits that documented this slow conversion.

374 of 571 new or added lines in 12 files covered. (65.5%)

2 existing lines in 2 files now uncovered.

419 of 658 relevant lines covered (63.68%)

1.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.22
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions.
6
//!
7
//! # `Cmd`
8
//!
9
//! The [`Cmd`] type has an inner `Vec<String>` that we can turn into
10
//! a [`tokio::process::Command`]. It implements `Clone`, which we use
11
//! to spawn the same process multiple times.
12
//!
13
//! # `ProcessStream`
14
//!
15
//! The [`ProcessStream`] type wraps [`tokio_process_stream`] in order
16
//! to provide an [`Item`] that implements `Eq` which we can then use
17
//! for testing.
18

19
use color_eyre::Result;
20
use pin_project::pin_project;
21
use std::collections::VecDeque;
22
use std::fmt;
23
use std::io;
24
use std::os::unix::process::ExitStatusExt;
25
use std::pin::Pin;
26
use std::process::ExitStatus;
27
use std::process::Stdio;
28
use std::task::{Context, Poll};
29
use tokio::process::Command;
30
use tokio_process_stream as tps;
31
use tokio_stream::Stream;
32
use tracing::instrument;
33

34
// Command wrapper ///////////////////////////////////////////////////
35

36
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
37
#[derive(Debug, Default, Clone)]
38
pub struct Cmd(Vec<String>);
39

40
impl From<&Cmd> for Command {
41
    fn from(cmd: &Cmd) -> Command {
7✔
42
        let mut command = Command::new(&cmd.0[0]);
7✔
43
        command.args(cmd.0.iter().skip(1));
7✔
44
        command.stdin(Stdio::null());
7✔
45
        command.stdout(Stdio::piped());
7✔
46
        command.stderr(Stdio::piped());
7✔
47
        command
7✔
48
    }
7✔
49
}
50

51
impl From<Vec<String>> for Cmd {
52
    fn from(s: Vec<String>) -> Cmd {
2✔
53
        Self(s)
2✔
54
    }
2✔
55
}
56

57
impl From<&[&str]> for Cmd {
58
    fn from(s: &[&str]) -> Cmd {
5✔
59
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
8✔
60
    }
5✔
61
}
62

63
impl fmt::Display for Cmd {
64
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
65
        let joined = self.0.join(" ");
1✔
66
        write!(f, "{joined}")
1✔
67
    }
1✔
68
}
69

70
// Exit status ///////////////////////////////////////////////////////
71

72
/// A [`std::process::ExitStatus`] pseudo-wrapper that impl Clone
73
/// and custom Display.
74
#[derive(Debug, Default, Clone, PartialEq, Eq)]
75
pub enum ExitSts {
76
    #[default]
77
    Success,
78
    Code(u8),
79
    Signal(i32),
80
}
81

82
impl ExitSts {
83
    pub fn success(&self) -> bool {
5✔
84
        self == &ExitSts::Success
5✔
85
    }
5✔
86
}
87

88
impl From<ExitStatus> for ExitSts {
89
    fn from(sts: ExitStatus) -> ExitSts {
5✔
90
        if sts.success() {
5✔
91
            ExitSts::Success
4✔
92
        } else if let Some(code) = sts.code() {
1✔
93
            ExitSts::Code(code as u8)
1✔
NEW
94
        } else if let Some(signal) = sts.signal() {
×
NEW
95
            ExitSts::Signal(signal)
×
96
        } else {
NEW
97
            panic!("Unable to figure out exit status {sts:?}")
×
98
        }
99
    }
5✔
100
}
101

102
impl fmt::Display for ExitSts {
NEW
103
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
104
        match self {
×
NEW
105
            ExitSts::Success => write!(f, "success"),
×
NEW
106
            ExitSts::Code(code) => write!(f, "code {code}"),
×
NEW
107
            ExitSts::Signal(signal) => write!(f, "signal {signal}"),
×
108
        }
NEW
109
    }
×
110
}
111

112
// ProcessStream /////////////////////////////////////////////////////
113

114
/// A clonable, Eq replacement for [`tokio_process_stream::Item`]
115
#[derive(Debug, Clone, PartialEq, Eq)]
116
pub enum Item {
117
    /// A stdout line printed by the process.
118
    Stdout(String),
119
    /// A stderr line printed by the process.
120
    Stderr(String),
121
    /// The [`ExitSts`], yielded after the process exits.
122
    Done(Result<ExitSts, io::ErrorKind>),
123
}
124

125
impl From<tps::Item<String>> for Item {
126
    fn from(item: tps::Item<String>) -> Self {
7✔
127
        match item {
7✔
128
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
129
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
130
            tps::Item::Done(result) => Item::Done(match result {
5✔
131
                Ok(sts) => Ok(sts.into()),
5✔
NEW
132
                Err(err) => Err(err.kind()),
×
133
            }),
134
        }
135
    }
7✔
136
}
137

138
/// A mockable wrapper for
139
/// [`tokio_process_stream::ProcessLineStream`].
140
#[pin_project(project = ProcessStreamProj)]
141
pub enum ProcessStream {
142
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
143
    Real { stream: Box<tps::ProcessLineStream> },
144
    /// Mock for a running process stream that just returns items from
145
    /// a list. Useful for testing.
146
    Virtual { items: VecDeque<Item> },
147
}
148

149
impl std::fmt::Debug for ProcessStream {
150
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
151
        match self {
1✔
NEW
152
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
153
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
154
        }
155
        .finish()
1✔
156
    }
1✔
157
}
158

159
impl From<tps::ProcessLineStream> for ProcessStream {
160
    fn from(stream: tps::ProcessLineStream) -> Self {
5✔
161
        ProcessStream::Real {
5✔
162
            stream: Box::new(stream),
5✔
163
        }
5✔
164
    }
5✔
165
}
166

167
impl From<VecDeque<Item>> for ProcessStream {
168
    fn from(items: VecDeque<Item>) -> Self {
3✔
169
        ProcessStream::Virtual { items }
3✔
170
    }
3✔
171
}
172

173
impl Stream for ProcessStream {
174
    type Item = Item;
175

176
    #[instrument(level = "debug", ret, skip(cx))]
177
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
23✔
178
        let this = self.project();
179
        match this {
180
            ProcessStreamProj::Real { stream } => {
181
                let next = Pin::new(stream).poll_next(cx);
182
                match next {
183
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
7✔
184
                    Poll::Pending => Poll::Pending,
185
                }
186
            }
187
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
188
        }
189
    }
23✔
190
}
191

192
// Tests /////////////////////////////////////////////////////////////
193

194
#[cfg(test)]
195
pub mod test {
196
    use color_eyre::Result;
197
    use color_eyre::eyre::eyre;
198
    use tokio_stream::StreamExt;
199

200
    use super::*;
201

202
    async fn stream_cmd(
4✔
203
        cmdstr: &[&str],
4✔
204
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
205
        let cmd = Cmd::from(cmdstr);
4✔
206
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
4✔
207
        Ok(ProcessStream::from(process_stream))
4✔
208
    }
4✔
209

210
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
211
    where
6✔
212
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
213
    {
6✔
214
        stream.next().await.ok_or(eyre!("no item received"))
6✔
215
    }
6✔
216

217
    #[tokio::test]
218
    async fn test_true() -> Result<()> {
1✔
219
        let mut stream = stream_cmd(&["true"]).await?;
1✔
220
        let item = stream_next(&mut stream).await?;
1✔
221
        let Item::Done(sts) = item else {
1✔
NEW
222
            return Err(eyre!("unexpected stream item {:?}", item));
×
223
        };
224
        assert!(sts.unwrap().success());
1✔
225
        assert!(stream.next().await.is_none());
1✔
226
        Ok(())
2✔
227
    }
1✔
228

229
    #[tokio::test]
230
    async fn test_false() -> Result<()> {
1✔
231
        let mut stream = stream_cmd(&["false"]).await?;
1✔
232
        let item = stream_next(&mut stream).await?;
1✔
233
        let Item::Done(sts) = item else {
1✔
NEW
234
            return Err(eyre!("unexpected stream item {:?}", item));
×
235
        };
236
        assert!(!sts.unwrap().success());
1✔
237
        Ok(())
2✔
238
    }
1✔
239

240
    #[tokio::test]
241
    async fn test_echo() -> Result<()> {
1✔
242
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
243
        let item = stream_next(&mut stream).await?;
1✔
244
        let Item::Stdout(s) = item else {
1✔
NEW
245
            return Err(eyre!("unexpected stream item {:?}", item));
×
246
        };
247
        assert_eq!(s, "test");
1✔
248
        let item = stream_next(&mut stream).await?;
1✔
249
        let Item::Done(sts) = item else {
1✔
NEW
250
            return Err(eyre!("unexpected stream item {:?}", item));
×
251
        };
252
        assert!(sts.unwrap().success());
1✔
253
        Ok(())
2✔
254
    }
1✔
255

256
    #[tokio::test]
257
    async fn test_stderr() -> Result<()> {
1✔
258
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
259
        let item = stream_next(&mut stream).await?;
1✔
260
        let Item::Stderr(s) = item else {
1✔
NEW
261
            return Err(eyre!("unexpected stream item {:?}", item));
×
262
        };
263
        assert_eq!(s, "test");
1✔
264
        let item = stream_next(&mut stream).await?;
1✔
265
        let Item::Done(sts) = item else {
1✔
NEW
266
            return Err(eyre!("unexpected stream item {:?}", item));
×
267
        };
268
        assert!(sts.unwrap().success());
1✔
269
        Ok(())
2✔
270
    }
1✔
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc