• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 15766954094

19 Jun 2025 09:59PM UTC coverage: 60.811% (-12.3%) from 73.105%
15766954094

push

github

lpenz
Pipe functional; progress bar missing

As is, ogle is working without a progress bar for the time while the
process is executing.

2 of 103 new or added lines in 3 files covered. (1.94%)

5 existing lines in 2 files now uncovered.

405 of 666 relevant lines covered (60.81%)

1.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.67
/src/input_stream.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use pin_project::pin_project;
7
use std::pin::Pin;
8
use std::process::ExitStatus;
9
use std::task::{Context, Poll};
10
use tokio_stream::Stream;
11
use tokio_stream::wrappers::IntervalStream;
12
use tracing::instrument;
13

14
use crate::sys_input;
15
use crate::sys_input::Cmd;
16
use crate::sys_input::ProcessStream;
17
use crate::sys_input::SysInputApi;
18
use crate::time_wrapper::Duration;
19
use crate::time_wrapper::Instant;
20

21
#[cfg(test)]
22
use crate::sys_input::SysInputVirtual;
23

24
// InputData, InputItem //////////////////////////////////////////////////////
25

26
#[derive(Debug, Clone, PartialEq, Eq)]
27
pub enum InputData {
28
    RunStart,
29
    SleepStart,
30
    LineOut(String),
31
    LineErr(String),
32
    Done(ExitStatus),
33
    Err(std::io::ErrorKind),
34
    RunTick,
35
    SleepTick(Instant),
36
}
37

38
impl From<sys_input::Item> for InputData {
39
    fn from(item: sys_input::Item) -> Self {
3✔
40
        match item {
1✔
41
            sys_input::Item::Stdout(l) => InputData::LineOut(l),
1✔
42
            sys_input::Item::Stderr(l) => InputData::LineErr(l),
1✔
43
            sys_input::Item::Done(Ok(sts)) => InputData::Done(sts),
1✔
44
            sys_input::Item::Done(Err(e)) => InputData::Err(e),
×
45
        }
46
    }
3✔
47
}
48

49
impl From<std::io::ErrorKind> for InputData {
50
    fn from(e: std::io::ErrorKind) -> Self {
1✔
51
        InputData::Err(e)
1✔
52
    }
1✔
53
}
54

55
impl From<std::io::Error> for InputData {
56
    fn from(e: std::io::Error) -> Self {
×
57
        e.kind().into()
×
58
    }
×
59
}
60

61
#[derive(Debug, Clone, PartialEq, Eq)]
62
pub struct InputItem {
63
    pub time: Instant,
64
    pub data: InputData,
65
}
66

67
impl InputItem {
68
    pub fn new<D>(time: Instant, data: D) -> InputItem
6✔
69
    where
6✔
70
        D: Into<InputData>,
6✔
71
    {
6✔
72
        Self {
6✔
73
            time,
6✔
74
            data: data.into(),
6✔
75
        }
6✔
76
    }
6✔
77
}
78

79
// InputStream ///////////////////////////////////////////////////////////////
80

81
#[derive(Debug)]
82
struct SleepingState {
83
    /// When to wake up
84
    deadline: Instant,
85
    /// 1s ticker
86
    ticker: IntervalStream,
87
}
88

89
#[derive(Debug, Default)]
90
enum State {
91
    /// State where we start the process on the next iteration.
92
    #[default]
93
    Start,
94
    /// The process is running and we are yielding lines and ticks.
95
    Running {
96
        /// Events coming from the running process
97
        process: ProcessStream,
98
        /// Tick events generated by the [`IntervalStream`] timer
99
        ticker: IntervalStream,
100
    },
101
    /// Sleeping between two process executions, yielding ticks.
102
    Sleeping(Option<SleepingState>),
103
    /// Don't execute the process again, either because of an exit
104
    /// condition or an error.
105
    Done,
106
}
107

108
#[pin_project]
8✔
109
#[derive(Default, Debug)]
110
pub struct InputStream<SI: SysInputApi> {
111
    sys_input: SI,
112
    cmd: Cmd,
113
    refresh: Duration,
114
    sleep: Duration,
115
    exit_on_success: bool,
116
    exit_on_failure: bool,
117
    state: State,
118
}
119

120
impl<SI: SysInputApi> InputStream<SI> {
121
    pub fn new(sys_input: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
×
122
        Ok(Self {
×
123
            sys_input,
×
124
            cmd,
×
125
            refresh,
×
126
            sleep,
×
127
            exit_on_success: false,
×
128
            exit_on_failure: false,
×
129
            state: State::Start,
×
130
        })
×
131
    }
×
132

133
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
134
        let process = self.sys_input.run_command(self.cmd.clone())?;
2✔
135
        let ticker = IntervalStream::new(self.refresh.into());
2✔
136
        self.state = State::Running { process, ticker };
2✔
137
        Ok(())
2✔
138
    }
2✔
139

NEW
140
    fn sleep(&mut self, now: Instant) -> InputItem {
×
141
        let deadline = &now + &self.sleep;
×
142
        let ticker = IntervalStream::new(Duration::seconds(1).into());
×
NEW
143
        self.state = State::Sleeping(Some(SleepingState { deadline, ticker }));
×
NEW
144
        let tick = InputData::SleepTick(deadline);
×
NEW
145
        InputItem::new(now, tick)
×
UNCOV
146
    }
×
147
}
148

149
#[cfg(test)]
150
impl InputStream<SysInputVirtual> {
151
    pub fn new_virtual(
2✔
152
        sys_input: SysInputVirtual,
2✔
153
        exit_on_success: bool,
2✔
154
        exit_on_failure: bool,
2✔
155
    ) -> Result<Self> {
2✔
156
        Ok(Self {
2✔
157
            sys_input,
2✔
158
            cmd: Cmd::default(),
2✔
159
            refresh: Duration::INFINITE,
2✔
160
            sleep: Duration::INFINITE,
2✔
161
            exit_on_success,
2✔
162
            exit_on_failure,
2✔
163
            state: State::Start,
2✔
164
        })
2✔
165
    }
2✔
166
}
167

168
impl<SI: SysInputApi> Stream for InputStream<SI> {
169
    type Item = InputItem;
170

171
    #[instrument(level = "debug", ret, skip(cx))]
172
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
173
        let this = self.as_mut().project();
174
        let now = this.sys_input.now();
175
        let mut state = std::mem::take(&mut *this.state);
176
        return match state {
177
            State::Start => match self.run() {
178
                Ok(_) => Poll::Ready(Some(InputItem::new(now, InputData::RunStart))),
179
                Err(e) => Poll::Ready(Some(InputItem::new(now, e))),
180
            },
181
            State::Sleeping(None) => {
182
                // Set up sleeping state: deadline, ticker
183
                self.sleep(now);
184
                Poll::Ready(Some(InputItem::new(now, InputData::SleepStart)))
185
            }
186
            State::Sleeping(Some(SleepingState {
187
                deadline,
188
                ref mut ticker,
189
            })) => {
190
                if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
191
                    let tick = InputData::SleepTick(deadline);
192
                    if now < deadline {
193
                        *this.state = state;
194
                        Poll::Ready(Some(InputItem::new(now, tick)))
195
                    } else {
196
                        *this.state = State::Start;
197
                        Poll::Ready(Some(InputItem::new(now, tick)))
198
                    }
199
                } else {
200
                    *this.state = state;
201
                    Poll::Pending
202
                }
203
            }
204
            State::Running {
205
                ref mut process,
206
                ref mut ticker,
207
            } => match Pin::new(process).poll_next(cx) {
208
                Poll::Ready(Some(item)) => match item {
209
                    sys_input::Item::Stdout(_) => {
210
                        *this.state = state;
211
                        Poll::Ready(Some(InputItem::new(now, item)))
212
                    }
213
                    sys_input::Item::Stderr(_) => {
214
                        *this.state = state;
215
                        Poll::Ready(Some(InputItem::new(now, item)))
216
                    }
217
                    sys_input::Item::Done(Ok(exitstatus)) => {
218
                        let success = exitstatus.success();
219
                        if success && *this.exit_on_success || !success && *this.exit_on_failure {
220
                            *this.state = State::Done;
221
                        } else {
222
                            *this.state = State::Sleeping(None);
223
                        }
224
                        Poll::Ready(Some(InputItem::new(now, item)))
225
                    }
226
                    sys_input::Item::Done(Err(e)) => {
227
                        *this.state = State::Done;
228
                        Poll::Ready(Some(InputItem::new(now, e)))
229
                    }
230
                },
231
                Poll::Ready(None) => {
232
                    #[cfg(not(test))]
233
                    panic!("We should never see the underlying stream end");
234
                    #[cfg(test)]
235
                    {
236
                        *this.state = State::Done;
237
                        Poll::Ready(None)
238
                    }
239
                }
240
                Poll::Pending => {
241
                    // Process doesn't have an item, it must be the ticker
242
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
243
                        *this.state = state;
244
                        Poll::Ready(Some(InputItem::new(now, InputData::RunTick)))
245
                    } else {
246
                        *this.state = state;
247
                        Poll::Pending
248
                    }
249
                }
250
            },
251
            State::Done => {
252
                *this.state = state;
253
                Poll::Ready(None)
254
            }
255
        };
256
    }
8✔
257
}
258

259
// Tests /////////////////////////////////////////////////////////////////////
260

261
#[cfg(test)]
262
mod tests {
263
    use color_eyre::Result;
264
    use std::io;
265
    use tokio_stream::StreamExt;
266

267
    use crate::sys_input::Item;
268
    use crate::sys_input::SysInputVirtual;
269
    use crate::time_wrapper::Instant;
270

271
    use super::*;
272

273
    #[tokio::test]
274
    async fn test_basic_success() -> Result<()> {
1✔
275
        let list = vec![
1✔
276
            Item::Stdout("stdout".into()),
1✔
277
            Item::Stderr("stderr".into()),
1✔
278
            Item::Done(Ok(ExitStatus::default())),
1✔
279
        ];
1✔
280
        let mut sys = SysInputVirtual::default();
1✔
281
        sys.set_items(list.clone());
1✔
282
        let streamer = InputStream::new_virtual(sys, true, true)?;
1✔
283
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
284
        let mut now = Instant::default();
1✔
285
        assert_eq!(
1✔
286
            streamed,
1✔
287
            vec![
1✔
288
                InputItem {
1✔
289
                    time: now.incr(),
1✔
290
                    data: InputData::RunStart
1✔
291
                },
1✔
292
                InputItem {
1✔
293
                    time: now.incr(),
1✔
294
                    data: InputData::LineOut("stdout".to_owned())
1✔
295
                },
1✔
296
                InputItem {
1✔
297
                    time: now.incr(),
1✔
298
                    data: InputData::LineErr("stderr".to_owned())
1✔
299
                },
1✔
300
                InputItem {
1✔
301
                    time: now.incr(),
1✔
302
                    data: InputData::Done(ExitStatus::default())
1✔
303
                }
1✔
304
            ]
1✔
305
        );
1✔
306
        Ok(())
1✔
307
    }
1✔
308

309
    #[tokio::test]
310
    async fn test_done_err() -> Result<()> {
1✔
311
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
312
        let mut sys = SysInputVirtual::default();
1✔
313
        sys.set_items(list.clone());
1✔
314
        let streamer = InputStream::new_virtual(sys, false, false)?;
1✔
315
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
316
        let mut now = Instant::default();
1✔
317
        assert_eq!(
1✔
318
            streamed,
1✔
319
            vec![
1✔
320
                InputItem {
1✔
321
                    time: now.incr(),
1✔
322
                    data: InputData::RunStart,
1✔
323
                },
1✔
324
                InputItem {
1✔
325
                    time: now.incr(),
1✔
326
                    data: InputData::Err(io::ErrorKind::UnexpectedEof)
1✔
327
                }
1✔
328
            ]
1✔
329
        );
1✔
330
        Ok(())
1✔
331
    }
1✔
332
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc