• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 16103284559

06 Jul 2025 09:03PM UTC coverage: 63.678% (+18.2%) from 45.475%
16103284559

push

github

lpenz
Complete refactoring using layers that should be easier to test

Layers connected via streams, which we can mock and test.
This combines a bunch of commits that documented this slow conversion.

374 of 571 new or added lines in 12 files covered. (65.5%)

2 existing lines in 2 files now uncovered.

419 of 658 relevant lines covered (63.68%)

1.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.65
/src/input.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use pin_project::pin_project;
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9
use tokio_stream::Stream;
10
use tokio_stream::wrappers::IntervalStream;
11
use tracing::instrument;
12

13
use crate::process_wrapper;
14
use crate::process_wrapper::Cmd;
15
use crate::process_wrapper::ExitSts;
16
use crate::process_wrapper::ProcessStream;
17
use crate::sys::SysApi;
18
use crate::time_wrapper::Duration;
19
use crate::time_wrapper::Instant;
20

21
#[cfg(test)]
22
use crate::sys::SysVirtual;
23

24
// InputData, InputItem //////////////////////////////////////////////////////
25

26
#[derive(Debug, Clone, PartialEq, Eq)]
27
pub enum InputData {
28
    Start,
29
    LineOut(String),
30
    LineErr(String),
31
    Done(ExitSts),
32
    Err(std::io::ErrorKind),
33
    RunTick,
34
    SleepTick(Instant),
35
}
36

37
impl From<process_wrapper::Item> for InputData {
38
    fn from(item: process_wrapper::Item) -> Self {
3✔
39
        match item {
1✔
40
            process_wrapper::Item::Stdout(l) => InputData::LineOut(l),
1✔
41
            process_wrapper::Item::Stderr(l) => InputData::LineErr(l),
1✔
42
            process_wrapper::Item::Done(Ok(sts)) => InputData::Done(sts),
1✔
NEW
43
            process_wrapper::Item::Done(Err(e)) => InputData::Err(e),
×
44
        }
45
    }
3✔
46
}
47

48
impl From<std::io::ErrorKind> for InputData {
49
    fn from(e: std::io::ErrorKind) -> Self {
1✔
50
        InputData::Err(e)
1✔
51
    }
1✔
52
}
53

54
impl From<std::io::Error> for InputData {
NEW
55
    fn from(e: std::io::Error) -> Self {
×
NEW
56
        e.kind().into()
×
NEW
57
    }
×
58
}
59

60
#[derive(Debug, Clone, PartialEq, Eq)]
61
pub struct InputItem {
62
    pub time: Instant,
63
    pub data: InputData,
64
}
65

66
impl InputItem {
67
    pub fn new<D>(time: Instant, data: D) -> InputItem
6✔
68
    where
6✔
69
        D: Into<InputData>,
6✔
70
    {
71
        Self {
6✔
72
            time,
6✔
73
            data: data.into(),
6✔
74
        }
6✔
75
    }
6✔
76
}
77

78
// InputStream ///////////////////////////////////////////////////////////////
79

80
#[derive(Debug, Default)]
81
enum State {
82
    /// State where we start the process on the next iteration.
83
    #[default]
84
    Start,
85
    /// The process is running and we are yielding lines and ticks.
86
    Running {
87
        /// Events coming from the running process
88
        process: ProcessStream,
89
        /// Tick events generated by the [`IntervalStream`] timer
90
        ticker: IntervalStream,
91
    },
92
    /// Sleeping between two process executions, yielding ticks.
93
    Sleeping {
94
        /// When to wake up
95
        deadline: Instant,
96
        /// 1s ticker
97
        ticker: IntervalStream,
98
    },
99
    /// Don't execute the process again, either because of an exit
100
    /// condition or an error.
101
    Done,
102
}
103

104
#[pin_project]
105
#[derive(Default, Debug)]
106
pub struct InputStream<SI: SysApi> {
107
    sys: SI,
108
    cmd: Cmd,
109
    refresh: Duration,
110
    sleep: Duration,
111
    exit_on_success: bool,
112
    exit_on_failure: bool,
113
    state: State,
114
}
115

116
impl<SI: SysApi> InputStream<SI> {
NEW
117
    pub fn new(sys: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
×
NEW
118
        Ok(Self {
×
NEW
119
            sys,
×
NEW
120
            cmd,
×
NEW
121
            refresh,
×
NEW
122
            sleep,
×
NEW
123
            exit_on_success: false,
×
NEW
124
            exit_on_failure: false,
×
NEW
125
            state: State::Start,
×
NEW
126
        })
×
NEW
127
    }
×
128

129
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
130
        let process = self.sys.run_command(self.cmd.clone())?;
2✔
131
        let ticker = IntervalStream::new(self.refresh.into());
2✔
132
        self.state = State::Running { process, ticker };
2✔
133
        Ok(())
2✔
134
    }
2✔
135

NEW
136
    fn sleep(&mut self, now: Instant) {
×
NEW
137
        let deadline = &now + &self.sleep;
×
NEW
138
        let ticker = IntervalStream::new(Duration::seconds(1).into());
×
NEW
139
        self.state = State::Sleeping { deadline, ticker };
×
NEW
140
    }
×
141
}
142

143
#[cfg(test)]
144
impl InputStream<SysVirtual> {
145
    pub fn new_virtual(
2✔
146
        sys: SysVirtual,
2✔
147
        exit_on_success: bool,
2✔
148
        exit_on_failure: bool,
2✔
149
    ) -> Result<Self> {
2✔
150
        Ok(Self {
2✔
151
            sys,
2✔
152
            cmd: Cmd::default(),
2✔
153
            refresh: Duration::INFINITE,
2✔
154
            sleep: Duration::INFINITE,
2✔
155
            exit_on_success,
2✔
156
            exit_on_failure,
2✔
157
            state: State::Start,
2✔
158
        })
2✔
159
    }
2✔
160
}
161

162
impl<SI: SysApi> Stream for InputStream<SI> {
163
    type Item = InputItem;
164

165
    #[instrument(level = "debug", ret, skip(cx))]
166
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
167
        let this = self.as_mut().project();
168
        let now = this.sys.now();
169
        let mut state = std::mem::take(&mut *this.state);
170
        return match state {
171
            State::Start => match self.run() {
172
                Ok(_) => Poll::Ready(Some(InputItem::new(now, InputData::Start))),
173
                Err(e) => Poll::Ready(Some(InputItem::new(now, e))),
174
            },
175
            State::Sleeping {
176
                deadline,
177
                ref mut ticker,
178
            } => {
179
                if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
180
                    let tick = InputData::SleepTick(deadline);
181
                    if now < deadline {
182
                        *this.state = state;
183
                        Poll::Ready(Some(InputItem::new(now, tick)))
184
                    } else {
185
                        *this.state = State::Start;
186
                        Poll::Ready(Some(InputItem::new(now, tick)))
187
                    }
188
                } else {
189
                    *this.state = state;
190
                    Poll::Pending
191
                }
192
            }
193
            State::Running {
194
                ref mut process,
195
                ref mut ticker,
196
            } => match Pin::new(process).poll_next(cx) {
197
                Poll::Ready(Some(item)) => match item {
198
                    process_wrapper::Item::Stdout(_) => {
199
                        *this.state = state;
200
                        Poll::Ready(Some(InputItem::new(now, item)))
201
                    }
202
                    process_wrapper::Item::Stderr(_) => {
203
                        *this.state = state;
204
                        Poll::Ready(Some(InputItem::new(now, item)))
205
                    }
206
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
207
                        let success = exitsts.success();
208
                        if success && *this.exit_on_success || !success && *this.exit_on_failure {
209
                            *this.state = State::Done;
210
                        } else {
211
                            self.sleep(now);
212
                        }
213
                        Poll::Ready(Some(InputItem::new(now, item)))
214
                    }
215
                    process_wrapper::Item::Done(Err(e)) => {
216
                        *this.state = State::Done;
217
                        Poll::Ready(Some(InputItem::new(now, e)))
218
                    }
219
                },
220
                Poll::Ready(None) => {
221
                    #[cfg(not(test))]
222
                    panic!("We should never see the underlying stream end");
223
                    #[cfg(test)]
224
                    {
225
                        *this.state = State::Done;
226
                        Poll::Ready(None)
227
                    }
228
                }
229
                Poll::Pending => {
230
                    // Process doesn't have an item, it must be the ticker
231
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
232
                        *this.state = state;
233
                        Poll::Ready(Some(InputItem::new(now, InputData::RunTick)))
234
                    } else {
235
                        *this.state = state;
236
                        Poll::Pending
237
                    }
238
                }
239
            },
240
            State::Done => {
241
                *this.state = state;
242
                Poll::Ready(None)
243
            }
244
        };
245
    }
8✔
246
}
247

248
// Tests /////////////////////////////////////////////////////////////////////
249

250
#[cfg(test)]
251
mod tests {
252
    use color_eyre::Result;
253
    use std::io;
254
    use tokio_stream::StreamExt;
255

256
    use crate::process_wrapper::Item;
257
    use crate::sys::SysVirtual;
258
    use crate::time_wrapper::Instant;
259

260
    use super::*;
261

262
    #[tokio::test]
263
    async fn test_basic_success() -> Result<()> {
1✔
264
        let list = vec![
1✔
265
            Item::Stdout("stdout".into()),
1✔
266
            Item::Stderr("stderr".into()),
1✔
267
            Item::Done(Ok(ExitSts::default())),
1✔
268
        ];
269
        let mut sys = SysVirtual::default();
1✔
270
        sys.set_items(list.clone());
1✔
271
        let streamer = InputStream::new_virtual(sys, true, true)?;
1✔
272
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
273
        let mut now = Instant::default();
1✔
274
        assert_eq!(
1✔
275
            streamed,
276
            vec![
1✔
277
                InputItem {
1✔
278
                    time: now.incr(),
1✔
279
                    data: InputData::Start
1✔
280
                },
1✔
281
                InputItem {
1✔
282
                    time: now.incr(),
1✔
283
                    data: InputData::LineOut("stdout".to_owned())
1✔
284
                },
1✔
285
                InputItem {
1✔
286
                    time: now.incr(),
1✔
287
                    data: InputData::LineErr("stderr".to_owned())
1✔
288
                },
1✔
289
                InputItem {
1✔
290
                    time: now.incr(),
1✔
291
                    data: InputData::Done(ExitSts::default())
1✔
292
                }
1✔
293
            ]
294
        );
295
        Ok(())
2✔
296
    }
1✔
297

298
    #[tokio::test]
299
    async fn test_done_err() -> Result<()> {
1✔
300
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
301
        let mut sys = SysVirtual::default();
1✔
302
        sys.set_items(list.clone());
1✔
303
        let streamer = InputStream::new_virtual(sys, false, false)?;
1✔
304
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
305
        let mut now = Instant::default();
1✔
306
        assert_eq!(
1✔
307
            streamed,
308
            vec![
1✔
309
                InputItem {
1✔
310
                    time: now.incr(),
1✔
311
                    data: InputData::Start,
1✔
312
                },
1✔
313
                InputItem {
1✔
314
                    time: now.incr(),
1✔
315
                    data: InputData::Err(io::ErrorKind::UnexpectedEof)
1✔
316
                }
1✔
317
            ]
318
        );
319
        Ok(())
2✔
320
    }
1✔
321
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc