• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 14070842831

25 Mar 2025 09:54PM UTC coverage: 79.029% (+0.2%) from 78.782%
14070842831

push

github

lpenz
InputItems now include the time, coming from sys_input

16 of 25 new or added lines in 2 files covered. (64.0%)

8 existing lines in 1 file now uncovered.

407 of 515 relevant lines covered (79.03%)

2.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/src/sys_input.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Module that wraps system functions used as inputs
6
//!
7
//! Wrapping this makes it very easy to test the whole program.
8

9
use color_eyre::Result;
10
use pin_project_lite::pin_project;
11
use std::collections::VecDeque;
12
use std::io;
13
use std::pin::Pin;
14
use std::process::ExitStatus;
15
use std::process::Stdio;
16
use std::task::{Context, Poll};
17
use tokio::process::Command;
18
use tokio_process_stream as tps;
19
use tokio_stream::Stream;
20

21
use crate::time_wrapper::Instant;
22

23
//////////////////////////////////////////////////////////////////////////////
24

25
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
26
#[derive(Debug, Default, Clone)]
27
pub struct Cmd(Vec<String>);
28

29
impl Cmd {
30
    pub fn get_command(self) -> Command {
8✔
31
        let mut command = Command::new(&self.0[0]);
8✔
32
        command.args(self.0.iter().skip(1));
8✔
33
        command.stdin(Stdio::null());
8✔
34
        command.stdout(Stdio::piped());
8✔
35
        command.stderr(Stdio::piped());
8✔
36
        command
8✔
37
    }
8✔
38
}
39

40
impl From<Vec<String>> for Cmd {
41
    fn from(s: Vec<String>) -> Cmd {
4✔
42
        Self(s)
4✔
43
    }
4✔
44
}
45

46
impl From<&[&str]> for Cmd {
47
    fn from(s: &[&str]) -> Cmd {
4✔
48
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
49
    }
4✔
50
}
51

52
//////////////////////////////////////////////////////////////////////////////
53

54
/// A clonable, PartialEq wrapper for [`tokio_process_stream::Item`]
55
#[derive(Debug, Clone, PartialEq, Eq)]
56
pub enum Item {
57
    /// A stdout line printed by the process.
58
    Stdout(String),
59
    /// A stderr line printed by the process.
60
    Stderr(String),
61
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
62
    Done(Result<ExitStatus, io::ErrorKind>),
63
}
64

65
impl From<tps::Item<String>> for Item {
66
    fn from(item: tps::Item<String>) -> Self {
8✔
67
        match item {
8✔
68
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
69
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
70
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
6✔
71
        }
72
    }
8✔
73
}
74

75
pin_project! {
76
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
77
#[project = ProcessStreamProj]
78
pub enum ProcessStream {
79
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
80
    Real { stream: tps::ProcessLineStream},
81
    /// Mock for a running process stream that just returns items from
82
    /// a list. Useful for testing.
83
    Virtual { items: VecDeque<Item> },
84
}
85
}
86

87
impl From<tps::ProcessLineStream> for ProcessStream {
88
    fn from(stream: tps::ProcessLineStream) -> Self {
6✔
89
        ProcessStream::Real { stream }
6✔
90
    }
6✔
91
}
92

93
impl From<VecDeque<Item>> for ProcessStream {
94
    fn from(items: VecDeque<Item>) -> Self {
1✔
95
        ProcessStream::Virtual { items }
1✔
96
    }
1✔
97
}
98

99
impl Stream for ProcessStream {
100
    type Item = Item;
101

102
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
25✔
103
        let this = self.project();
25✔
104
        match this {
25✔
105
            ProcessStreamProj::Real { stream } => {
21✔
106
                let next = Pin::new(stream).poll_next(cx);
21✔
107
                if let Poll::Ready(opt) = next {
21✔
108
                    Poll::Ready(opt.map(|i| i.into()))
9✔
109
                } else {
110
                    Poll::Pending
12✔
111
                }
112
            }
113
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
4✔
114
        }
115
    }
25✔
116
}
117

118
//////////////////////////////////////////////////////////////////////////////
119

120
/// Wrap the system functions we use as inputs.
121
///
122
/// This wrapper makes testing easy.
123
pub trait SysInputApi: std::fmt::Debug + Clone + Default {
124
    fn now(&self) -> Instant;
125
    fn run_command(&mut self, command: Cmd) -> Result<ProcessStream, std::io::Error>;
126
}
127

128
/// [`SysInputApi`] implementation of the real environment.
129
#[derive(Debug, Clone, Default)]
130
pub struct SysInputReal {}
131

132
impl SysInputApi for SysInputReal {
133
    fn now(&self) -> Instant {
10✔
134
        Instant::from(chrono::offset::Utc::now())
10✔
135
    }
10✔
136
    fn run_command(&mut self, cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
6✔
137
        let process_stream = tps::ProcessLineStream::try_from(cmd.get_command())?;
6✔
138
        Ok(ProcessStream::from(process_stream))
6✔
139
    }
6✔
140
}
141

142
/// [`SysInputApi`] implementation of a virtual environment, to be used in tests.
143
#[derive(Debug, Clone, Default)]
144
pub struct SysInputVirtual {
145
    items: VecDeque<Item>,
146
}
147

148
impl SysInputApi for SysInputVirtual {
NEW
149
    fn now(&self) -> Instant {
×
NEW
150
        Default::default()
×
NEW
151
    }
×
152
    fn run_command(&mut self, _cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
1✔
153
        let items = std::mem::take(&mut self.items);
1✔
154
        Ok(ProcessStream::from(items))
1✔
155
    }
1✔
156
}
157

158
#[cfg(test)]
159
impl SysInputVirtual {
160
    pub fn set_items(&mut self, items: Vec<Item>) {
1✔
161
        self.items = items.into_iter().collect();
1✔
162
    }
1✔
163
}
164

165
#[cfg(test)]
166
pub mod test {
167
    use color_eyre::eyre::eyre;
168
    use color_eyre::Result;
169
    use tokio_stream::StreamExt;
170

171
    use crate::sys_input::SysInputReal;
172
    use crate::sys_input::SysInputVirtual;
173

174
    use super::*;
175

176
    // Tests for SysInputReal with simple unix bins as we don't cover
177
    // it in downstream tests
178

179
    async fn stream_cmd(
4✔
180
        cmdstr: &[&str],
4✔
181
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
182
        let cmd = Cmd::from(cmdstr);
4✔
183
        let mut sys = SysInputReal::default();
4✔
184
        sys.run_command(cmd).map_err(|e| eyre!(e))
4✔
185
    }
4✔
186

187
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
188
    where
6✔
189
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
190
    {
6✔
191
        stream.next().await.ok_or(eyre!("no item received"))
6✔
192
    }
6✔
193

194
    #[tokio::test]
195
    async fn test_true() -> Result<()> {
1✔
196
        let mut stream = stream_cmd(&["true"]).await?;
1✔
197
        let item = stream_next(&mut stream).await?;
1✔
198
        let Item::Done(sts) = item else {
1✔
199
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
200
        };
1✔
201
        assert!(sts.unwrap().success());
1✔
202
        assert!(stream.next().await.is_none());
1✔
203
        Ok(())
1✔
204
    }
1✔
205

206
    #[tokio::test]
207
    async fn test_false() -> Result<()> {
1✔
208
        let mut stream = stream_cmd(&["false"]).await?;
1✔
209
        let item = stream_next(&mut stream).await?;
1✔
210
        let Item::Done(sts) = item else {
1✔
211
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
212
        };
1✔
213
        assert!(!sts.unwrap().success());
1✔
214
        Ok(())
1✔
215
    }
1✔
216

217
    #[tokio::test]
218
    async fn test_echo() -> Result<()> {
1✔
219
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
220
        let item = stream_next(&mut stream).await?;
1✔
221
        let Item::Stdout(s) = item else {
1✔
222
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
223
        };
1✔
224
        assert_eq!(s, "test");
1✔
225
        let item = stream_next(&mut stream).await?;
1✔
226
        let Item::Done(sts) = item else {
1✔
227
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
228
        };
1✔
229
        assert!(sts.unwrap().success());
1✔
230
        Ok(())
1✔
231
    }
1✔
232

233
    #[tokio::test]
234
    async fn test_stderr() -> Result<()> {
1✔
235
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
236
        let item = stream_next(&mut stream).await?;
1✔
237
        let Item::Stderr(s) = item else {
1✔
238
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
239
        };
1✔
240
        assert_eq!(s, "test");
1✔
241
        let item = stream_next(&mut stream).await?;
1✔
242
        let Item::Done(sts) = item else {
1✔
243
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
244
        };
1✔
245
        assert!(sts.unwrap().success());
1✔
246
        Ok(())
1✔
247
    }
1✔
248

249
    #[test]
250
    fn test_now() {
1✔
251
        let sys = SysInputReal::default();
1✔
252
        let now = sys.now();
1✔
253
        let now2 = sys.now();
1✔
254
        assert!(&now2 >= &now);
1✔
255
    }
1✔
256

257
    // A simple test for SysInputVirtual as we cover it better in
258
    // downstream tests
259

260
    #[tokio::test]
261
    async fn test_sysinputvirtual() -> Result<()> {
1✔
262
        let list = vec![
1✔
263
            Item::Stdout("stdout".into()),
1✔
264
            Item::Stderr("stderr".into()),
1✔
265
            Item::Done(Ok(ExitStatus::default())),
1✔
266
        ];
1✔
267
        let mut sys = SysInputVirtual::default();
1✔
268
        sys.set_items(list.clone());
1✔
269
        let streamer = sys.run_command(Cmd::default())?;
1✔
270
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
271
        assert_eq!(streamed, list);
1✔
272
        Ok(())
1✔
273
    }
1✔
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc