• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 14434066913

13 Apr 2025 10:44PM UTC coverage: 70.648% (-5.3%) from 75.983%
14434066913

push

github

lpenz
Migrate to pin_project to be able to use methods; more in Pipe

1 of 50 new or added lines in 3 files covered. (2.0%)

2 existing lines in 1 file now uncovered.

349 of 494 relevant lines covered (70.65%)

2.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.63
/src/sys_input.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Module that wraps system functions used as inputs
6
//!
7
//! Wrapping this makes it very easy to test the whole program.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::cell::RefCell;
12
use std::collections::VecDeque;
13
use std::fmt;
14
use std::io;
15
use std::pin::Pin;
16
use std::process::ExitStatus;
17
use std::process::Stdio;
18
use std::task::{Context, Poll};
19
use tokio::process::Command;
20
use tokio_process_stream as tps;
21
use tokio_stream::Stream;
22

23
use crate::term_wrapper;
24
use crate::time_wrapper::Duration;
25
use crate::time_wrapper::Instant;
26

27
//////////////////////////////////////////////////////////////////////////////
28

29
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
30
#[derive(Debug, Default, Clone)]
31
pub struct Cmd(Vec<String>);
32

33
impl Cmd {
34
    pub fn get_command(self) -> Command {
6✔
35
        let mut command = Command::new(&self.0[0]);
6✔
36
        command.args(self.0.iter().skip(1));
6✔
37
        command.stdin(Stdio::null());
6✔
38
        command.stdout(Stdio::piped());
6✔
39
        command.stderr(Stdio::piped());
6✔
40
        command
6✔
41
    }
6✔
42
}
43

44
impl From<Vec<String>> for Cmd {
45
    fn from(s: Vec<String>) -> Cmd {
2✔
46
        Self(s)
2✔
47
    }
2✔
48
}
49

50
impl From<&[&str]> for Cmd {
51
    fn from(s: &[&str]) -> Cmd {
4✔
52
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
53
    }
4✔
54
}
55

56
impl fmt::Display for Cmd {
NEW
57
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
58
        let joined = self.0.join(" ");
×
NEW
59
        write!(f, "{}", joined)
×
NEW
60
    }
×
61
}
62

63
//////////////////////////////////////////////////////////////////////////////
64

65
/// A clonable, PartialEq wrapper for [`tokio_process_stream::Item`]
66
#[derive(Debug, Clone, PartialEq, Eq)]
67
pub enum Item {
68
    /// A stdout line printed by the process.
69
    Stdout(String),
70
    /// A stderr line printed by the process.
71
    Stderr(String),
72
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
73
    Done(Result<ExitStatus, io::ErrorKind>),
74
}
75

76
impl From<tps::Item<String>> for Item {
77
    fn from(item: tps::Item<String>) -> Self {
6✔
78
        match item {
6✔
79
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
80
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
81
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
4✔
82
        }
83
    }
6✔
84
}
85

86
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
NEW
87
#[pin_project(project = ProcessStreamProj)]
×
88
pub enum ProcessStream {
89
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
90
    Real { stream: tps::ProcessLineStream },
91
    /// Mock for a running process stream that just returns items from
92
    /// a list. Useful for testing.
93
    Virtual { items: VecDeque<Item> },
94
}
95

96
impl From<tps::ProcessLineStream> for ProcessStream {
97
    fn from(stream: tps::ProcessLineStream) -> Self {
4✔
98
        ProcessStream::Real { stream }
4✔
99
    }
4✔
100
}
101

102
impl From<VecDeque<Item>> for ProcessStream {
103
    fn from(items: VecDeque<Item>) -> Self {
3✔
104
        ProcessStream::Virtual { items }
3✔
105
    }
3✔
106
}
107

108
impl Stream for ProcessStream {
109
    type Item = Item;
110

111
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
21✔
112
        let this = self.project();
21✔
113
        match this {
21✔
114
            ProcessStreamProj::Real { stream } => {
13✔
115
                let next = Pin::new(stream).poll_next(cx);
13✔
116
                if let Poll::Ready(opt) = next {
13✔
117
                    Poll::Ready(opt.map(|i| i.into()))
7✔
118
                } else {
119
                    Poll::Pending
6✔
120
                }
121
            }
122
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
8✔
123
        }
124
    }
21✔
125
}
126

127
//////////////////////////////////////////////////////////////////////////////
128

129
/// Wrap the system functions we use as inputs.
130
///
131
/// This wrapper makes testing easy.
132
pub trait SysInputApi: std::fmt::Debug + Clone + Default {
133
    fn now(&self) -> Instant;
134
    #[allow(dead_code)]
135
    fn size_checked(&self) -> Option<(u16, u16)>;
136
    fn run_command(&mut self, command: Cmd) -> Result<ProcessStream, std::io::Error>;
137
}
138

139
/// [`SysInputApi`] implementation of the real environment.
140
#[derive(Debug, Clone, Default)]
141
pub struct SysInputReal {}
142

143
impl SysInputApi for SysInputReal {
144
    fn now(&self) -> Instant {
4✔
145
        Instant::from(chrono::offset::Utc::now())
4✔
146
    }
4✔
147
    fn size_checked(&self) -> Option<(u16, u16)> {
×
148
        term_wrapper::size_checked()
×
149
    }
×
150
    fn run_command(&mut self, cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
4✔
151
        let process_stream = tps::ProcessLineStream::try_from(cmd.get_command())?;
4✔
152
        Ok(ProcessStream::from(process_stream))
4✔
153
    }
4✔
154
}
155

156
/// [`SysInputApi`] implementation of a virtual environment, to be used in tests.
157
#[derive(Debug, Clone, Default)]
158
pub struct SysInputVirtual {
159
    now: RefCell<Instant>,
160
    items: VecDeque<Item>,
161
}
162

163
impl SysInputApi for SysInputVirtual {
164
    fn now(&self) -> Instant {
10✔
165
        let mut now_ref = self.now.borrow_mut();
10✔
166
        let now = *now_ref;
10✔
167
        *now_ref = &now + &Duration::seconds(1);
10✔
168
        now
10✔
169
    }
10✔
170
    fn size_checked(&self) -> Option<(u16, u16)> {
1✔
171
        Some((25, 80))
1✔
172
    }
1✔
173
    fn run_command(&mut self, _cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
3✔
174
        let items = std::mem::take(&mut self.items);
3✔
175
        Ok(ProcessStream::from(items))
3✔
176
    }
3✔
177
}
178

179
#[cfg(test)]
180
impl SysInputVirtual {
181
    pub fn set_items(&mut self, items: Vec<Item>) {
3✔
182
        self.items = items.into_iter().collect();
3✔
183
    }
3✔
184
}
185

186
#[cfg(test)]
187
pub mod test {
188
    use color_eyre::eyre::eyre;
189
    use color_eyre::Result;
190
    use tokio_stream::StreamExt;
191

192
    use crate::sys_input::SysInputReal;
193
    use crate::sys_input::SysInputVirtual;
194

195
    use super::*;
196

197
    // Tests for SysInputReal with simple unix bins as we don't cover
198
    // it in downstream tests
199

200
    async fn stream_cmd(
4✔
201
        cmdstr: &[&str],
4✔
202
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
203
        let cmd = Cmd::from(cmdstr);
4✔
204
        let mut sys = SysInputReal::default();
4✔
205
        sys.run_command(cmd).map_err(|e| eyre!(e))
4✔
206
    }
4✔
207

208
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
209
    where
6✔
210
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
211
    {
6✔
212
        stream.next().await.ok_or(eyre!("no item received"))
6✔
213
    }
6✔
214

215
    #[tokio::test]
216
    async fn test_true() -> Result<()> {
1✔
217
        let mut stream = stream_cmd(&["true"]).await?;
1✔
218
        let item = stream_next(&mut stream).await?;
1✔
219
        let Item::Done(sts) = item else {
1✔
220
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
221
        };
1✔
222
        assert!(sts.unwrap().success());
1✔
223
        assert!(stream.next().await.is_none());
1✔
224
        Ok(())
1✔
225
    }
1✔
226

227
    #[tokio::test]
228
    async fn test_false() -> Result<()> {
1✔
229
        let mut stream = stream_cmd(&["false"]).await?;
1✔
230
        let item = stream_next(&mut stream).await?;
1✔
231
        let Item::Done(sts) = item else {
1✔
232
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
233
        };
1✔
234
        assert!(!sts.unwrap().success());
1✔
235
        Ok(())
1✔
236
    }
1✔
237

238
    #[tokio::test]
239
    async fn test_echo() -> Result<()> {
1✔
240
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
241
        let item = stream_next(&mut stream).await?;
1✔
242
        let Item::Stdout(s) = item else {
1✔
243
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
244
        };
1✔
245
        assert_eq!(s, "test");
1✔
246
        let item = stream_next(&mut stream).await?;
1✔
247
        let Item::Done(sts) = item else {
1✔
248
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
249
        };
1✔
250
        assert!(sts.unwrap().success());
1✔
251
        Ok(())
1✔
252
    }
1✔
253

254
    #[tokio::test]
255
    async fn test_stderr() -> Result<()> {
1✔
256
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
257
        let item = stream_next(&mut stream).await?;
1✔
258
        let Item::Stderr(s) = item else {
1✔
259
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
260
        };
1✔
261
        assert_eq!(s, "test");
1✔
262
        let item = stream_next(&mut stream).await?;
1✔
263
        let Item::Done(sts) = item else {
1✔
264
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
265
        };
1✔
266
        assert!(sts.unwrap().success());
1✔
267
        Ok(())
1✔
268
    }
1✔
269

270
    #[test]
271
    fn test_now() {
1✔
272
        let sys = SysInputReal::default();
1✔
273
        let now = sys.now();
1✔
274
        let now2 = sys.now();
1✔
275
        assert!(&now2 >= &now);
1✔
276
    }
1✔
277

278
    // A simple test for SysInputVirtual as we cover it better in
279
    // downstream tests
280

281
    #[tokio::test]
282
    async fn test_sysinputvirtual() -> Result<()> {
1✔
283
        let list = vec![
1✔
284
            Item::Stdout("stdout".into()),
1✔
285
            Item::Stderr("stderr".into()),
1✔
286
            Item::Done(Ok(ExitStatus::default())),
1✔
287
        ];
1✔
288
        let mut sys = SysInputVirtual::default();
1✔
289
        sys.set_items(list.clone());
1✔
290
        let streamer = sys.run_command(Cmd::default())?;
1✔
291
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
292
        assert_eq!(streamed, list);
1✔
293
        assert_eq!(sys.now(), Instant::default());
1✔
294
        assert_eq!(sys.now(), &Instant::default() + &Duration::seconds(1));
1✔
295
        assert_eq!(sys.size_checked(), Some((25, 80)));
1✔
296
        Ok(())
1✔
297
    }
1✔
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc