• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 15799464343

21 Jun 2025 08:41PM UTC coverage: 59.94% (-0.6%) from 60.538%
15799464343

push

github

lpenz
Move process wrapping code to process_wrapper.rs

104 of 107 new or added lines in 2 files covered. (97.2%)

4 existing lines in 1 file now uncovered.

401 of 669 relevant lines covered (59.94%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/src/process_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for process functions
6

7
use color_eyre::Result;
8
use pin_project::pin_project;
9
use std::collections::VecDeque;
10
use std::fmt;
11
use std::io;
12
use std::pin::Pin;
13
use std::process::ExitStatus;
14
use std::process::Stdio;
15
use std::task::{Context, Poll};
16
use tokio::process::Command;
17
use tokio_process_stream as tps;
18
use tokio_stream::Stream;
19
use tracing::instrument;
20

21
// Command wrapper ///////////////////////////////////////////////////////////
22

23
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
24
#[derive(Debug, Default, Clone)]
25
pub struct Cmd(Vec<String>);
26

27
impl From<&Cmd> for Command {
28
    fn from(cmd: &Cmd) -> Command {
6✔
29
        let mut command = Command::new(&cmd.0[0]);
6✔
30
        command.args(cmd.0.iter().skip(1));
6✔
31
        command.stdin(Stdio::null());
6✔
32
        command.stdout(Stdio::piped());
6✔
33
        command.stderr(Stdio::piped());
6✔
34
        command
6✔
35
    }
6✔
36
}
37

38
impl From<Vec<String>> for Cmd {
39
    fn from(s: Vec<String>) -> Cmd {
2✔
40
        Self(s)
2✔
41
    }
2✔
42
}
43

44
impl From<&[&str]> for Cmd {
45
    fn from(s: &[&str]) -> Cmd {
4✔
46
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
47
    }
4✔
48
}
49

50
impl fmt::Display for Cmd {
51
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
52
        let joined = self.0.join(" ");
1✔
53
        write!(f, "{}", joined)
1✔
54
    }
1✔
55
}
56

57
//////////////////////////////////////////////////////////////////////////////
58

59
/// A clonable, PartialEq replacement for [`tokio_process_stream::Item`]
60
#[derive(Debug, Clone, PartialEq, Eq)]
61
pub enum Item {
62
    /// A stdout line printed by the process.
63
    Stdout(String),
64
    /// A stderr line printed by the process.
65
    Stderr(String),
66
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
67
    Done(Result<ExitStatus, io::ErrorKind>),
68
}
69

70
impl From<tps::Item<String>> for Item {
71
    fn from(item: tps::Item<String>) -> Self {
6✔
72
        match item {
6✔
73
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
74
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
75
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
4✔
76
        }
77
    }
6✔
78
}
79

80
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
NEW
81
#[pin_project(project = ProcessStreamProj)]
×
82
pub enum ProcessStream {
83
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
84
    Real { stream: Box<tps::ProcessLineStream> },
85
    /// Mock for a running process stream that just returns items from
86
    /// a list. Useful for testing.
87
    Virtual { items: VecDeque<Item> },
88
}
89

90
impl std::fmt::Debug for ProcessStream {
91
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
92
        match self {
1✔
NEW
93
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
94
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
1✔
95
        }
96
        .finish()
1✔
97
    }
1✔
98
}
99

100
impl From<tps::ProcessLineStream> for ProcessStream {
101
    fn from(stream: tps::ProcessLineStream) -> Self {
4✔
102
        ProcessStream::Real {
4✔
103
            stream: Box::new(stream),
4✔
104
        }
4✔
105
    }
4✔
106
}
107

108
impl From<VecDeque<Item>> for ProcessStream {
109
    fn from(items: VecDeque<Item>) -> Self {
3✔
110
        ProcessStream::Virtual { items }
3✔
111
    }
3✔
112
}
113

114
impl Stream for ProcessStream {
115
    type Item = Item;
116

117
    #[instrument(level = "debug", ret, skip(cx))]
118
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21✔
119
        let this = self.project();
120
        match this {
121
            ProcessStreamProj::Real { stream } => {
122
                let next = Pin::new(stream).poll_next(cx);
123
                match next {
124
                    Poll::Ready(opt) => Poll::Ready(opt.map(|i| i.into())),
6✔
125
                    Poll::Pending => Poll::Pending,
126
                }
127
            }
128
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
129
        }
130
    }
21✔
131
}
132

133
#[cfg(test)]
134
pub mod test {
135
    use color_eyre::Result;
136
    use color_eyre::eyre::eyre;
137
    use tokio_stream::StreamExt;
138

139
    use super::*;
140

141
    async fn stream_cmd(
4✔
142
        cmdstr: &[&str],
4✔
143
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
144
        let cmd = Cmd::from(cmdstr);
4✔
145
        let process_stream = tps::ProcessLineStream::try_from(Command::from(&cmd))?;
4✔
146
        Ok(ProcessStream::from(process_stream))
4✔
147
    }
4✔
148

149
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
150
    where
6✔
151
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
152
    {
6✔
153
        stream.next().await.ok_or(eyre!("no item received"))
6✔
154
    }
6✔
155

156
    #[tokio::test]
157
    async fn test_true() -> Result<()> {
1✔
158
        let mut stream = stream_cmd(&["true"]).await?;
1✔
159
        let item = stream_next(&mut stream).await?;
1✔
160
        let Item::Done(sts) = item else {
1✔
161
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
162
        };
1✔
163
        assert!(sts.unwrap().success());
1✔
164
        assert!(stream.next().await.is_none());
1✔
165
        Ok(())
1✔
166
    }
1✔
167

168
    #[tokio::test]
169
    async fn test_false() -> Result<()> {
1✔
170
        let mut stream = stream_cmd(&["false"]).await?;
1✔
171
        let item = stream_next(&mut stream).await?;
1✔
172
        let Item::Done(sts) = item else {
1✔
173
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
174
        };
1✔
175
        assert!(!sts.unwrap().success());
1✔
176
        Ok(())
1✔
177
    }
1✔
178

179
    #[tokio::test]
180
    async fn test_echo() -> Result<()> {
1✔
181
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
182
        let item = stream_next(&mut stream).await?;
1✔
183
        let Item::Stdout(s) = item else {
1✔
184
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
185
        };
1✔
186
        assert_eq!(s, "test");
1✔
187
        let item = stream_next(&mut stream).await?;
1✔
188
        let Item::Done(sts) = item else {
1✔
189
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
190
        };
1✔
191
        assert!(sts.unwrap().success());
1✔
192
        Ok(())
1✔
193
    }
1✔
194

195
    #[tokio::test]
196
    async fn test_stderr() -> Result<()> {
1✔
197
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
198
        let item = stream_next(&mut stream).await?;
1✔
199
        let Item::Stderr(s) = item else {
1✔
200
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
201
        };
1✔
202
        assert_eq!(s, "test");
1✔
203
        let item = stream_next(&mut stream).await?;
1✔
204
        let Item::Done(sts) = item else {
1✔
205
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
206
        };
1✔
207
        assert!(sts.unwrap().success());
1✔
208
        Ok(())
1✔
209
    }
1✔
210
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc