• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 15810912669

22 Jun 2025 09:20PM UTC coverage: 60.303% (+0.4%) from 59.94%
15810912669

push

github

lpenz
Add documentation and rename a bunch of files

11 of 15 new or added lines in 5 files covered. (73.33%)

2 existing lines in 2 files now uncovered.

398 of 660 relevant lines covered (60.3%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions.
6
//!
7
//! # [`Cmd`]
8
//!
9
//! `Cmd` has an inner `Vec<String>` that we can turn into a
10
//! [`tokio::process::Command`]. It implements `Clone`, which we use
11
//! to spawn the same process multiple times.
12
//!
13
//! # [`ProcessStream`]
14
//!
15
//! `ProcessStream` wraps [`tokio_process_stream`] in order to provide
16
//! an [`Item`] that implements `Eq` which we can then use for
17
//! testing.
18

19
use color_eyre::Result;
20
use pin_project::pin_project;
21
use std::collections::VecDeque;
22
use std::fmt;
23
use std::io;
24
use std::pin::Pin;
25
use std::process::ExitStatus;
26
use std::process::Stdio;
27
use std::task::{Context, Poll};
28
use tokio::process::Command;
29
use tokio_process_stream as tps;
30
use tokio_stream::Stream;
31
use tracing::instrument;
32

33
// Command wrapper ///////////////////////////////////////////////////
34

35
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
36
#[derive(Debug, Default, Clone)]
37
pub struct Cmd(Vec<String>);
38

39
impl From<&Cmd> for Command {
40
    fn from(cmd: &Cmd) -> Command {
6✔
41
        let mut command = Command::new(&cmd.0[0]);
6✔
42
        command.args(cmd.0.iter().skip(1));
6✔
43
        command.stdin(Stdio::null());
6✔
44
        command.stdout(Stdio::piped());
6✔
45
        command.stderr(Stdio::piped());
6✔
46
        command
6✔
47
    }
6✔
48
}
49

50
impl From<Vec<String>> for Cmd {
51
    fn from(s: Vec<String>) -> Cmd {
2✔
52
        Self(s)
2✔
53
    }
2✔
54
}
55

56
impl From<&[&str]> for Cmd {
57
    fn from(s: &[&str]) -> Cmd {
4✔
58
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
59
    }
4✔
60
}
61

62
impl fmt::Display for Cmd {
63
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
64
        let joined = self.0.join(" ");
1✔
65
        write!(f, "{}", joined)
1✔
66
    }
1✔
67
}
68

69
// ProcessStream /////////////////////////////////////////////////////
70

71
/// A clonable, Eq replacement for [`tokio_process_stream::Item`]
72
#[derive(Debug, Clone, PartialEq, Eq)]
73
pub enum Item {
74
    /// A stdout line printed by the process.
75
    Stdout(String),
76
    /// A stderr line printed by the process.
77
    Stderr(String),
78
    /// The [`ExitStatus`], yielded after the process exits.
79
    Done(Result<ExitStatus, io::ErrorKind>),
80
}
81

82
impl From<tps::Item<String>> for Item {
83
    fn from(item: tps::Item<String>) -> Self {
6✔
84
        match item {
6✔
85
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
86
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
87
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
4✔
88
        }
89
    }
6✔
90
}
91

92
/// A mockable wrapper for
93
/// [`tokio_process_stream::ProcessLineStream`].
UNCOV
94
#[pin_project(project = ProcessStreamProj)]
×
95
pub enum ProcessStream {
96
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
97
    Real { stream: Box<tps::ProcessLineStream> },
98
    /// Mock for a running process stream that just returns items from
99
    /// a list. Useful for testing.
100
    Virtual { items: VecDeque<Item> },
101
}
102

103
impl std::fmt::Debug for ProcessStream {
104
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
105
        match self {
1✔
106
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
107
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
108
        }
109
        .finish()
1✔
110
    }
1✔
111
}
112

113
impl From<tps::ProcessLineStream> for ProcessStream {
114
    fn from(stream: tps::ProcessLineStream) -> Self {
4✔
115
        ProcessStream::Real {
4✔
116
            stream: Box::new(stream),
4✔
117
        }
4✔
118
    }
4✔
119
}
120

121
impl From<VecDeque<Item>> for ProcessStream {
122
    fn from(items: VecDeque<Item>) -> Self {
3✔
123
        ProcessStream::Virtual { items }
3✔
124
    }
3✔
125
}
126

127
impl Stream for ProcessStream {
128
    type Item = Item;
129

130
    #[instrument(level = "debug", ret, skip(cx))]
131
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
20✔
132
        let this = self.project();
133
        match this {
134
            ProcessStreamProj::Real { stream } => {
135
                let next = Pin::new(stream).poll_next(cx);
136
                match next {
137
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
6✔
138
                    Poll::Pending => Poll::Pending,
139
                }
140
            }
141
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
142
        }
143
    }
20✔
144
}
145

146
// Tests /////////////////////////////////////////////////////////////
147

148
#[cfg(test)]
149
pub mod test {
150
    use color_eyre::Result;
151
    use color_eyre::eyre::eyre;
152
    use tokio_stream::StreamExt;
153

154
    use super::*;
155

156
    async fn stream_cmd(
4✔
157
        cmdstr: &[&str],
4✔
158
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
159
        let cmd = Cmd::from(cmdstr);
4✔
160
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
4✔
161
        Ok(ProcessStream::from(process_stream))
4✔
162
    }
4✔
163

164
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
165
    where
6✔
166
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
167
    {
6✔
168
        stream.next().await.ok_or(eyre!("no item received"))
6✔
169
    }
6✔
170

171
    #[tokio::test]
172
    async fn test_true() -> Result<()> {
1✔
173
        let mut stream = stream_cmd(&["true"]).await?;
1✔
174
        let item = stream_next(&mut stream).await?;
1✔
175
        let Item::Done(sts) = item else {
1✔
176
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
177
        };
1✔
178
        assert!(sts.unwrap().success());
1✔
179
        assert!(stream.next().await.is_none());
1✔
180
        Ok(())
1✔
181
    }
1✔
182

183
    #[tokio::test]
184
    async fn test_false() -> Result<()> {
1✔
185
        let mut stream = stream_cmd(&["false"]).await?;
1✔
186
        let item = stream_next(&mut stream).await?;
1✔
187
        let Item::Done(sts) = item else {
1✔
188
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
189
        };
1✔
190
        assert!(!sts.unwrap().success());
1✔
191
        Ok(())
1✔
192
    }
1✔
193

194
    #[tokio::test]
195
    async fn test_echo() -> Result<()> {
1✔
196
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
197
        let item = stream_next(&mut stream).await?;
1✔
198
        let Item::Stdout(s) = item else {
1✔
199
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
200
        };
1✔
201
        assert_eq!(s, "test");
1✔
202
        let item = stream_next(&mut stream).await?;
1✔
203
        let Item::Done(sts) = item else {
1✔
204
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
205
        };
1✔
206
        assert!(sts.unwrap().success());
1✔
207
        Ok(())
1✔
208
    }
1✔
209

210
    #[tokio::test]
211
    async fn test_stderr() -> Result<()> {
1✔
212
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
213
        let item = stream_next(&mut stream).await?;
1✔
214
        let Item::Stderr(s) = item else {
1✔
215
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
216
        };
1✔
217
        assert_eq!(s, "test");
1✔
218
        let item = stream_next(&mut stream).await?;
1✔
219
        let Item::Done(sts) = item else {
1✔
220
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
221
        };
1✔
222
        assert!(sts.unwrap().success());
1✔
223
        Ok(())
1✔
224
    }
1✔
225
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc