• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 14693599884

27 Apr 2025 03:33PM UTC coverage: 70.713%. Remained the same
14693599884

push

github

lpenz
Add tracing.instrument to ProcessStream and InputStream

0 of 19 new or added lines in 2 files covered. (0.0%)

9 existing lines in 2 files now uncovered.

367 of 519 relevant lines covered (70.71%)

1.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.41
/src/sys_input.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Module that wraps system functions used as inputs
6
//!
7
//! Wrapping this makes it very easy to test the whole program.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::cell::RefCell;
12
use std::collections::VecDeque;
13
use std::fmt;
14
use std::io;
15
use std::pin::Pin;
16
use std::process::ExitStatus;
17
use std::process::Stdio;
18
use std::task::{Context, Poll};
19
use tokio::process::Command;
20
use tokio_process_stream as tps;
21
use tokio_stream::Stream;
22
use tracing::instrument;
23

24
use crate::term_wrapper;
25
use crate::time_wrapper::Duration;
26
use crate::time_wrapper::Instant;
27

28
//////////////////////////////////////////////////////////////////////////////
29

30
/// A [`tokio::process::Command`] pseudo-wrapper that `impl Clone`.
31
#[derive(Debug, Default, Clone)]
32
pub struct Cmd(Vec<String>);
33

34
impl Cmd {
35
    pub fn get_command(self) -> Command {
6✔
36
        let mut command = Command::new(&self.0[0]);
6✔
37
        command.args(self.0.iter().skip(1));
6✔
38
        command.stdin(Stdio::null());
6✔
39
        command.stdout(Stdio::piped());
6✔
40
        command.stderr(Stdio::piped());
6✔
41
        command
6✔
42
    }
6✔
43
}
44

45
impl From<Vec<String>> for Cmd {
46
    fn from(s: Vec<String>) -> Cmd {
2✔
47
        Self(s)
2✔
48
    }
2✔
49
}
50

51
impl From<&[&str]> for Cmd {
52
    fn from(s: &[&str]) -> Cmd {
4✔
53
        Self(s.iter().map(|s| s.to_string()).collect::<Vec<_>>())
7✔
54
    }
4✔
55
}
56

57
impl fmt::Display for Cmd {
UNCOV
58
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
59
        let joined = self.0.join(" ");
×
60
        write!(f, "{}", joined)
×
61
    }
×
62
}
63

64
//////////////////////////////////////////////////////////////////////////////
65

66
/// A clonable, PartialEq wrapper for [`tokio_process_stream::Item`]
67
#[derive(Debug, Clone, PartialEq, Eq)]
68
pub enum Item {
69
    /// A stdout line printed by the process.
70
    Stdout(String),
71
    /// A stderr line printed by the process.
72
    Stderr(String),
73
    /// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
74
    Done(Result<ExitStatus, io::ErrorKind>),
75
}
76

77
impl From<tps::Item<String>> for Item {
78
    fn from(item: tps::Item<String>) -> Self {
6✔
79
        match item {
6✔
80
            tps::Item::Stdout(s) => Item::Stdout(s),
1✔
81
            tps::Item::Stderr(s) => Item::Stderr(s),
1✔
82
            tps::Item::Done(result) => Item::Done(result.map_err(|e| e.kind())),
4✔
83
        }
84
    }
6✔
85
}
86

87
/// A mockable wrapper for [`tokio_process_stream::ProcessLineStream`].
UNCOV
88
#[pin_project(project = ProcessStreamProj)]
×
89
pub enum ProcessStream {
90
    /// Wrapper for [`tokio_process_stream::ProcessLineStream`].
91
    Real { stream: tps::ProcessLineStream },
92
    /// Mock for a running process stream that just returns items from
93
    /// a list. Useful for testing.
94
    Virtual { items: VecDeque<Item> },
95
}
96

97
impl std::fmt::Debug for ProcessStream {
NEW
98
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
99
        match self {
×
NEW
100
            ProcessStream::Real { stream: _ } => f.debug_struct("ProcessStream::Real"),
×
NEW
101
            ProcessStream::Virtual { items: _ } => f.debug_struct("ProcessStream::Virtual"),
×
102
        }
NEW
103
        .finish()
×
NEW
104
    }
×
105
}
106

107
impl From<tps::ProcessLineStream> for ProcessStream {
108
    fn from(stream: tps::ProcessLineStream) -> Self {
4✔
109
        ProcessStream::Real { stream }
4✔
110
    }
4✔
111
}
112

113
impl From<VecDeque<Item>> for ProcessStream {
114
    fn from(items: VecDeque<Item>) -> Self {
3✔
115
        ProcessStream::Virtual { items }
3✔
116
    }
3✔
117
}
118

119
impl Stream for ProcessStream {
120
    type Item = Item;
121

122
    #[instrument(level = "debug", ret, skip(cx))]
123
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124
        let this = self.project();
125
        match this {
126
            ProcessStreamProj::Real { stream } => {
127
                let next = Pin::new(stream).poll_next(cx);
128
                if let Poll::Ready(opt) = next {
129
                    Poll::Ready(opt.map(|i| i.into()))
6✔
130
                } else {
131
                    Poll::Pending
132
                }
133
            }
134
            ProcessStreamProj::Virtual { items } => Poll::Ready(items.pop_front()),
135
        }
136
    }
137
}
138

139
//////////////////////////////////////////////////////////////////////////////
140

141
/// Wrap the system functions we use as inputs.
142
///
143
/// This wrapper makes testing easy.
144
pub trait SysInputApi: std::fmt::Debug + Clone + Default {
145
    fn now(&self) -> Instant;
146
    #[allow(dead_code)]
147
    fn size_checked(&self) -> Option<(u16, u16)>;
148
    fn run_command(&mut self, command: Cmd) -> Result<ProcessStream, std::io::Error>;
149
}
150

151
/// [`SysInputApi`] implementation of the real environment.
152
#[derive(Debug, Clone, Default)]
153
pub struct SysInputReal {}
154

155
impl SysInputApi for SysInputReal {
156
    fn now(&self) -> Instant {
4✔
157
        Instant::from(chrono::offset::Utc::now())
4✔
158
    }
4✔
UNCOV
159
    fn size_checked(&self) -> Option<(u16, u16)> {
×
UNCOV
160
        term_wrapper::size_checked()
×
UNCOV
161
    }
×
162
    fn run_command(&mut self, cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
4✔
163
        let process_stream = tps::ProcessLineStream::try_from(cmd.get_command())?;
4✔
164
        Ok(ProcessStream::from(process_stream))
4✔
165
    }
4✔
166
}
167

168
/// [`SysInputApi`] implementation of a virtual environment, to be used in tests.
169
#[derive(Debug, Clone, Default)]
170
pub struct SysInputVirtual {
171
    now: RefCell<Instant>,
172
    items: VecDeque<Item>,
173
}
174

175
impl SysInputApi for SysInputVirtual {
176
    fn now(&self) -> Instant {
10✔
177
        let mut now_ref = self.now.borrow_mut();
10✔
178
        let now = *now_ref;
10✔
179
        *now_ref = &now + &Duration::seconds(1);
10✔
180
        now
10✔
181
    }
10✔
182
    fn size_checked(&self) -> Option<(u16, u16)> {
1✔
183
        Some((25, 80))
1✔
184
    }
1✔
185
    fn run_command(&mut self, _cmd: Cmd) -> Result<ProcessStream, std::io::Error> {
3✔
186
        let items = std::mem::take(&mut self.items);
3✔
187
        Ok(ProcessStream::from(items))
3✔
188
    }
3✔
189
}
190

191
#[cfg(test)]
192
impl SysInputVirtual {
193
    pub fn set_items(&mut self, items: Vec<Item>) {
3✔
194
        self.items = items.into_iter().collect();
3✔
195
    }
3✔
196
}
197

198
#[cfg(test)]
199
pub mod test {
200
    use color_eyre::eyre::eyre;
201
    use color_eyre::Result;
202
    use tokio_stream::StreamExt;
203

204
    use crate::sys_input::SysInputReal;
205
    use crate::sys_input::SysInputVirtual;
206

207
    use super::*;
208

209
    // Tests for SysInputReal with simple unix bins as we don't cover
210
    // it in downstream tests
211

212
    async fn stream_cmd(
4✔
213
        cmdstr: &[&str],
4✔
214
    ) -> Result<impl StreamExt<Item = Item> + std::marker::Unpin + Send + 'static> {
4✔
215
        let cmd = Cmd::from(cmdstr);
4✔
216
        let mut sys = SysInputReal::default();
4✔
217
        sys.run_command(cmd).map_err(|e| eyre!(e))
4✔
218
    }
4✔
219

220
    async fn stream_next<T>(stream: &mut T) -> Result<Item>
6✔
221
    where
6✔
222
        T: StreamExt<Item = Item> + std::marker::Unpin + Send + 'static,
6✔
223
    {
6✔
224
        stream.next().await.ok_or(eyre!("no item received"))
6✔
225
    }
6✔
226

227
    #[tokio::test]
228
    async fn test_true() -> Result<()> {
1✔
229
        let mut stream = stream_cmd(&["true"]).await?;
1✔
230
        let item = stream_next(&mut stream).await?;
1✔
231
        let Item::Done(sts) = item else {
1✔
232
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
233
        };
1✔
234
        assert!(sts.unwrap().success());
1✔
235
        assert!(stream.next().await.is_none());
1✔
236
        Ok(())
1✔
237
    }
1✔
238

239
    #[tokio::test]
240
    async fn test_false() -> Result<()> {
1✔
241
        let mut stream = stream_cmd(&["false"]).await?;
1✔
242
        let item = stream_next(&mut stream).await?;
1✔
243
        let Item::Done(sts) = item else {
1✔
244
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
245
        };
1✔
246
        assert!(!sts.unwrap().success());
1✔
247
        Ok(())
1✔
248
    }
1✔
249

250
    #[tokio::test]
251
    async fn test_echo() -> Result<()> {
1✔
252
        let mut stream = stream_cmd(&["echo", "test"]).await?;
1✔
253
        let item = stream_next(&mut stream).await?;
1✔
254
        let Item::Stdout(s) = item else {
1✔
255
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
256
        };
1✔
257
        assert_eq!(s, "test");
1✔
258
        let item = stream_next(&mut stream).await?;
1✔
259
        let Item::Done(sts) = item else {
1✔
260
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
261
        };
1✔
262
        assert!(sts.unwrap().success());
1✔
263
        Ok(())
1✔
264
    }
1✔
265

266
    #[tokio::test]
267
    async fn test_stderr() -> Result<()> {
1✔
268
        let mut stream = stream_cmd(&["/bin/sh", "-c", "echo test >&2"]).await?;
1✔
269
        let item = stream_next(&mut stream).await?;
1✔
270
        let Item::Stderr(s) = item else {
1✔
271
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
272
        };
1✔
273
        assert_eq!(s, "test");
1✔
274
        let item = stream_next(&mut stream).await?;
1✔
275
        let Item::Done(sts) = item else {
1✔
276
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
277
        };
1✔
278
        assert!(sts.unwrap().success());
1✔
279
        Ok(())
1✔
280
    }
1✔
281

282
    #[test]
283
    fn test_now() {
1✔
284
        let sys = SysInputReal::default();
1✔
285
        let now = sys.now();
1✔
286
        let now2 = sys.now();
1✔
287
        assert!(&now2 >= &now);
1✔
288
    }
1✔
289

290
    // A simple test for SysInputVirtual as we cover it better in
291
    // downstream tests
292

293
    #[tokio::test]
294
    async fn test_sysinputvirtual() -> Result<()> {
1✔
295
        let list = vec![
1✔
296
            Item::Stdout("stdout".into()),
1✔
297
            Item::Stderr("stderr".into()),
1✔
298
            Item::Done(Ok(ExitStatus::default())),
1✔
299
        ];
1✔
300
        let mut sys = SysInputVirtual::default();
1✔
301
        sys.set_items(list.clone());
1✔
302
        let streamer = sys.run_command(Cmd::default())?;
1✔
303
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
304
        assert_eq!(streamed, list);
1✔
305
        assert_eq!(sys.now(), Instant::default());
1✔
306
        assert_eq!(sys.now(), &Instant::default() + &Duration::seconds(1));
1✔
307
        assert_eq!(sys.size_checked(), Some((25, 80)));
1✔
308
        Ok(())
1✔
309
    }
1✔
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc