• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 15799519566

21 Jun 2025 08:49PM UTC coverage: 59.94% (-0.6%) from 60.538%
15799519566

push

github

lpenz
Move process wrapping code to process_wrapper.rs

104 of 107 new or added lines in 2 files covered. (97.2%)

4 existing lines in 1 file now uncovered.

401 of 669 relevant lines covered (59.94%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions
6
//!
7
//! First we have [`Cmd`], with an inner `Vec<String>` that we can
8
//! turn into a [`tokio::process::Command`]. It implements `Clone`,
9
//! which we use to spawn the same process multiple times.
10
//!
11
//! We then wrap [`tokio_process_stream`] in order to provide an
12
//! [`Item`] that implements `Eq` which we can then use for testing.
13

14
use color_eyre::Result;
15
use pin_project::pin_project;
16
use std::collections::VecDeque;
17
use std::fmt;
18
use std::io;
19
use std::pin::Pin;
20
use std::process::ExitStatus;
21
use std::process::Stdio;
22
use std::task::{Context, Poll};
23
use tokio::process::Command;
24
use tokio_process_stream as tps;
25
use tokio_stream::Stream;
26
use tracing::instrument;
27

28
// Command wrapper ///////////////////////////////////////////////////////////
29

30
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
31
#[derive(Debug, Default, Clone)]
32
pub struct Cmd(Vec<String>);
33

34
impl From<&Cmd> for Command {
35
    fn from(cmd: &Cmd) -> Command {
6✔
36
        let mut command = Command::new(&cmd.0[0]);
6✔
37
        command.args(cmd.0.iter().skip(1));
6✔
38
        command.stdin(Stdio::null());
6✔
39
        command.stdout(Stdio::piped());
6✔
40
        command.stderr(Stdio::piped());
6✔
41
        command
6✔
42
    }
6✔
43
}
44

45
impl From<Vec<String>> for Cmd {
46
    fn from(s: Vec<String>) -> Cmd {
2✔
47
        Self(s)
2✔
48
    }
2✔
49
}
50

51
impl From<&[&str]> for Cmd {
52
    fn from(s: &[&str]) -> Cmd {
4✔
53
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
54
    }
4✔
55
}
56

57
impl fmt::Display for Cmd {
58
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
59
        let joined = self.0.join(" ");
1✔
60
        write!(f, "{}", joined)
1✔
61
    }
1✔
62
}
63

64
//////////////////////////////////////////////////////////////////////////////
65

66
/// A clonable, PartialEq replacement for [`tokio_process_stream::Item`]
67
#[derive(Debug, Clone, PartialEq, Eq)]
68
pub enum Item {
69
    /// A stdout line printed by the process.
70
    Stdout(String),
71
    /// A stderr line printed by the process.
72
    Stderr(String),
73
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
74
    Done(Result<ExitStatus, io::ErrorKind>),
75
}
76

77
impl From<tps::Item<String>> for Item {
78
    fn from(item: tps::Item<String>) -> Self {
6✔
79
        match item {
6✔
80
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
81
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
82
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
4✔
83
        }
84
    }
6✔
85
}
86

87
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
NEW
88
#[pin_project(project = ProcessStreamProj)]
×
89
pub enum ProcessStream {
90
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
91
    Real { stream: Box<tps::ProcessLineStream> },
92
    /// Mock for a running process stream that just returns items from
93
    /// a list. Useful for testing.
94
    Virtual { items: VecDeque<Item> },
95
}
96

97
impl std::fmt::Debug for ProcessStream {
98
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
99
        match self {
1✔
NEW
100
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
101
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
102
        }
103
        .finish()
1✔
104
    }
1✔
105
}
106

107
impl From<tps::ProcessLineStream> for ProcessStream {
108
    fn from(stream: tps::ProcessLineStream) -> Self {
4✔
109
        ProcessStream::Real {
4✔
110
            stream: Box::new(stream),
4✔
111
        }
4✔
112
    }
4✔
113
}
114

115
impl From<VecDeque<Item>> for ProcessStream {
116
    fn from(items: VecDeque<Item>) -> Self {
3✔
117
        ProcessStream::Virtual { items }
3✔
118
    }
3✔
119
}
120

121
impl Stream for ProcessStream {
122
    type Item = Item;
123

124
    #[instrument(level = "debug", ret, skip(cx))]
125
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21✔
126
        let this = self.project();
127
        match this {
128
            ProcessStreamProj::Real { stream } => {
129
                let next = Pin::new(stream).poll_next(cx);
130
                match next {
131
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
6✔
132
                    Poll::Pending => Poll::Pending,
133
                }
134
            }
135
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
136
        }
137
    }
21✔
138
}
139

140
#[cfg(test)]
141
pub mod test {
142
    use color_eyre::Result;
143
    use color_eyre::eyre::eyre;
144
    use tokio_stream::StreamExt;
145

146
    use super::*;
147

148
    async fn stream_cmd(
4✔
149
        cmdstr: &[&str],
4✔
150
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
151
        let cmd = Cmd::from(cmdstr);
4✔
152
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
4✔
153
        Ok(ProcessStream::from(process_stream))
4✔
154
    }
4✔
155

156
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
157
    where
6✔
158
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
159
    {
6✔
160
        stream.next().await.ok_or(eyre!("no item received"))
6✔
161
    }
6✔
162

163
    #[tokio::test]
164
    async fn test_true() -> Result<()> {
1✔
165
        let mut stream = stream_cmd(&["true"]).await?;
1✔
166
        let item = stream_next(&mut stream).await?;
1✔
167
        let Item::Done(sts) = item else {
1✔
168
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
169
        };
1✔
170
        assert!(sts.unwrap().success());
1✔
171
        assert!(stream.next().await.is_none());
1✔
172
        Ok(())
1✔
173
    }
1✔
174

175
    #[tokio::test]
176
    async fn test_false() -> Result<()> {
1✔
177
        let mut stream = stream_cmd(&["false"]).await?;
1✔
178
        let item = stream_next(&mut stream).await?;
1✔
179
        let Item::Done(sts) = item else {
1✔
180
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
181
        };
1✔
182
        assert!(!sts.unwrap().success());
1✔
183
        Ok(())
1✔
184
    }
1✔
185

186
    #[tokio::test]
187
    async fn test_echo() -> Result<()> {
1✔
188
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
189
        let item = stream_next(&mut stream).await?;
1✔
190
        let Item::Stdout(s) = item else {
1✔
191
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
192
        };
1✔
193
        assert_eq!(s, "test");
1✔
194
        let item = stream_next(&mut stream).await?;
1✔
195
        let Item::Done(sts) = item else {
1✔
196
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
197
        };
1✔
198
        assert!(sts.unwrap().success());
1✔
199
        Ok(())
1✔
200
    }
1✔
201

202
    #[tokio::test]
203
    async fn test_stderr() -> Result<()> {
1✔
204
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
205
        let item = stream_next(&mut stream).await?;
1✔
206
        let Item::Stderr(s) = item else {
1✔
207
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
208
        };
1✔
209
        assert_eq!(s, "test");
1✔
210
        let item = stream_next(&mut stream).await?;
1✔
211
        let Item::Done(sts) = item else {
1✔
212
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
213
        };
1✔
214
        assert!(sts.unwrap().success());
1✔
215
        Ok(())
1✔
216
    }
1✔
217
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc