• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 14344483017

08 Apr 2025 10:05PM UTC coverage: 79.933%. Remained the same
14344483017

push

github

lpenz
Implement a couple of tests for input_stream

61 of 61 new or added lines in 2 files covered. (100.0%)

7 existing lines in 2 files now uncovered.

478 of 598 relevant lines covered (79.93%)

3.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.92
/src/sys_input.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Module that wraps system functions used as inputs
6
//!
7
//! Wrapping this makes it very easy to test the whole program.
8

9
use color_eyre::Result;
10
use pin_project_lite::pin_project;
11
use std::cell::RefCell;
12
use std::collections::VecDeque;
13
use std::io;
14
use std::pin::Pin;
15
use std::process::ExitStatus;
16
use std::process::Stdio;
17
use std::task::{Context, Poll};
18
use tokio::process::Command;
19
use tokio_process_stream as tps;
20
use tokio_stream::Stream;
21

22
use crate::term_wrapper;
23
use crate::time_wrapper::Duration;
24
use crate::time_wrapper::Instant;
25

26
//////////////////////////////////////////////////////////////////////////////
27

28
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
29
#[derive(Debug, Default, Clone)]
30
pub struct Cmd(Vec<String>);
31

32
impl Cmd {
33
    pub fn get_command(self) -> Command {
8✔
34
        let mut command = Command::new(&self.0[0]);
8✔
35
        command.args(self.0.iter().skip(1));
8✔
36
        command.stdin(Stdio::null());
8✔
37
        command.stdout(Stdio::piped());
8✔
38
        command.stderr(Stdio::piped());
8✔
39
        command
8✔
40
    }
8✔
41
}
42

43
impl From<Vec<String>> for Cmd {
44
    fn from(s: Vec<String>) -> Cmd {
4✔
45
        Self(s)
4✔
46
    }
4✔
47
}
48

49
impl From<&[&str]> for Cmd {
50
    fn from(s: &[&str]) -> Cmd {
4✔
51
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
52
    }
4✔
53
}
54

55
//////////////////////////////////////////////////////////////////////////////
56

57
/// A clonable, PartialEq wrapper for [`tokio_process_stream::Item`]
58
#[derive(Debug, Clone, PartialEq, Eq)]
59
pub enum Item {
60
    /// A stdout line printed by the process.
61
    Stdout(String),
62
    /// A stderr line printed by the process.
63
    Stderr(String),
64
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
65
    Done(Result<ExitStatus, io::ErrorKind>),
66
}
67

68
impl From<tps::Item<String>> for Item {
69
    fn from(item: tps::Item<String>) -> Self {
8✔
70
        match item {
8✔
71
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
72
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
73
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
6✔
74
        }
75
    }
8✔
76
}
77

78
pin_project! {
79
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
80
#[project = ProcessStreamProj]
81
pub enum ProcessStream {
82
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
83
    Real { stream: tps::ProcessLineStream},
84
    /// Mock for a running process stream that just returns items from
85
    /// a list. Useful for testing.
86
    Virtual { items: VecDeque<Item> },
87
}
88
}
89

90
impl From<tps::ProcessLineStream> for ProcessStream {
91
    fn from(stream: tps::ProcessLineStream) -> Self {
6✔
92
        ProcessStream::Real { stream }
6✔
93
    }
6✔
94
}
95

96
impl From<VecDeque<Item>> for ProcessStream {
97
    fn from(items: VecDeque<Item>) -> Self {
3✔
98
        ProcessStream::Virtual { items }
3✔
99
    }
3✔
100
}
101

102
impl Stream for ProcessStream {
103
    type Item = Item;
104

105
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31✔
106
        let this = self.project();
31✔
107
        match this {
31✔
108
            ProcessStreamProj::Real { stream } => {
21✔
109
                let next = Pin::new(stream).poll_next(cx);
21✔
110
                if let Poll::Ready(opt) = next {
21✔
111
                    Poll::Ready(opt.map(|i| i.into()))
9✔
112
                } else {
113
                    Poll::Pending
12✔
114
                }
115
            }
116
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
10✔
117
        }
118
    }
31✔
119
}
120

121
//////////////////////////////////////////////////////////////////////////////
122

123
/// Wrap the system functions we use as inputs.
124
///
125
/// This wrapper makes testing easy.
126
pub trait SysInputApi: std::fmt::Debug + Clone + Default {
127
    fn now(&self) -> Instant;
128
    #[allow(dead_code)]
129
    fn size_checked(&self) -> Option<(u16, u16)>;
130
    fn run_command(&mut self, command: Cmd) -> Result<ProcessStream, std::io::Error>;
131
}
132

133
/// [`SysInputApi`] implementation of the real environment.
134
#[derive(Debug, Clone, Default)]
135
pub struct SysInputReal {}
136

137
impl SysInputApi for SysInputReal {
138
    fn now(&self) -> Instant {
10✔
139
        Instant::from(chrono::offset::Utc::now())
10✔
140
    }
10✔
UNCOV
141
    fn size_checked(&self) -> Option<(u16, u16)> {
×
UNCOV
142
        term_wrapper::size_checked()
×
143
    }
×
144
    fn run_command(&mut self, cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
6✔
145
        let process_stream = tps::ProcessLineStream::try_from(cmd.get_command())?;
6✔
146
        Ok(ProcessStream::from(process_stream))
6✔
147
    }
6✔
148
}
149

150
/// [`SysInputApi`] implementation of a virtual environment, to be used in tests.
151
#[derive(Debug, Clone, Default)]
152
pub struct SysInputVirtual {
153
    now: RefCell<Instant>,
154
    items: VecDeque<Item>,
155
}
156

157
impl SysInputApi for SysInputVirtual {
158
    fn now(&self) -> Instant {
8✔
159
        let mut now_ref = self.now.borrow_mut();
8✔
160
        let now = *now_ref;
8✔
161
        *now_ref = &now + &Duration::seconds(1);
8✔
162
        now
8✔
163
    }
8✔
164
    fn size_checked(&self) -> Option<(u16, u16)> {
1✔
165
        Some((25, 80))
1✔
166
    }
1✔
167
    fn run_command(&mut self, _cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
3✔
168
        let items = std::mem::take(&mut self.items);
3✔
169
        Ok(ProcessStream::from(items))
3✔
170
    }
3✔
171
}
172

173
#[cfg(test)]
174
impl SysInputVirtual {
175
    pub fn set_items(&mut self, items: Vec<Item>) {
3✔
176
        self.items = items.into_iter().collect();
3✔
177
    }
3✔
178
}
179

180
#[cfg(test)]
181
pub mod test {
182
    use color_eyre::eyre::eyre;
183
    use color_eyre::Result;
184
    use tokio_stream::StreamExt;
185

186
    use crate::sys_input::SysInputReal;
187
    use crate::sys_input::SysInputVirtual;
188

189
    use super::*;
190

191
    // Tests for SysInputReal with simple unix bins as we don't cover
192
    // it in downstream tests
193

194
    async fn stream_cmd(
4✔
195
        cmdstr: &[&str],
4✔
196
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
197
        let cmd = Cmd::from(cmdstr);
4✔
198
        let mut sys = SysInputReal::default();
4✔
199
        sys.run_command(cmd).map_err(|e| eyre!(e))
4✔
200
    }
4✔
201

202
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
203
    where
6✔
204
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
205
    {
6✔
206
        stream.next().await.ok_or(eyre!("no item received"))
6✔
207
    }
6✔
208

209
    #[tokio::test]
210
    async fn test_true() -> Result<()> {
1✔
211
        let mut stream = stream_cmd(&["true"]).await?;
1✔
212
        let item = stream_next(&mut stream).await?;
1✔
213
        let Item::Done(sts) = item else {
1✔
214
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
215
        };
1✔
216
        assert!(sts.unwrap().success());
1✔
217
        assert!(stream.next().await.is_none());
1✔
218
        Ok(())
1✔
219
    }
1✔
220

221
    #[tokio::test]
222
    async fn test_false() -> Result<()> {
1✔
223
        let mut stream = stream_cmd(&["false"]).await?;
1✔
224
        let item = stream_next(&mut stream).await?;
1✔
225
        let Item::Done(sts) = item else {
1✔
226
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
227
        };
1✔
228
        assert!(!sts.unwrap().success());
1✔
229
        Ok(())
1✔
230
    }
1✔
231

232
    #[tokio::test]
233
    async fn test_echo() -> Result<()> {
1✔
234
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
235
        let item = stream_next(&mut stream).await?;
1✔
236
        let Item::Stdout(s) = item else {
1✔
237
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
238
        };
1✔
239
        assert_eq!(s, "test");
1✔
240
        let item = stream_next(&mut stream).await?;
1✔
241
        let Item::Done(sts) = item else {
1✔
242
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
243
        };
1✔
244
        assert!(sts.unwrap().success());
1✔
245
        Ok(())
1✔
246
    }
1✔
247

248
    #[tokio::test]
249
    async fn test_stderr() -> Result<()> {
1✔
250
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
251
        let item = stream_next(&mut stream).await?;
1✔
252
        let Item::Stderr(s) = item else {
1✔
253
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
254
        };
1✔
255
        assert_eq!(s, "test");
1✔
256
        let item = stream_next(&mut stream).await?;
1✔
257
        let Item::Done(sts) = item else {
1✔
258
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
259
        };
1✔
260
        assert!(sts.unwrap().success());
1✔
261
        Ok(())
1✔
262
    }
1✔
263

264
    #[test]
265
    fn test_now() {
1✔
266
        let sys = SysInputReal::default();
1✔
267
        let now = sys.now();
1✔
268
        let now2 = sys.now();
1✔
269
        assert!(&now2 >= &now);
1✔
270
    }
1✔
271

272
    // A simple test for SysInputVirtual as we cover it better in
273
    // downstream tests
274

275
    #[tokio::test]
276
    async fn test_sysinputvirtual() -> Result<()> {
1✔
277
        let list = vec![
1✔
278
            Item::Stdout("stdout".into()),
1✔
279
            Item::Stderr("stderr".into()),
1✔
280
            Item::Done(Ok(ExitStatus::default())),
1✔
281
        ];
1✔
282
        let mut sys = SysInputVirtual::default();
1✔
283
        sys.set_items(list.clone());
1✔
284
        let streamer = sys.run_command(Cmd::default())?;
1✔
285
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
286
        assert_eq!(streamed, list);
1✔
287
        assert_eq!(sys.now(), Instant::default());
1✔
288
        assert_eq!(sys.now(), &Instant::default() + &Duration::seconds(1));
1✔
289
        assert_eq!(sys.size_checked(), Some((25, 80)));
1✔
290
        Ok(())
1✔
291
    }
1✔
292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc