• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 15718699515

17 Jun 2025 09:44PM UTC coverage: 71.947% (-0.8%) from 72.71%
15718699515

push

github

lpenz
InputStream refactored and mostly working

More tests pending.

23 of 71 new or added lines in 3 files covered. (32.39%)

5 existing lines in 3 files now uncovered.

377 of 524 relevant lines covered (71.95%)

1.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.36
/src/input_stream.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use pin_project::pin_project;
7
use std::pin::Pin;
8
use std::process::ExitStatus;
9
use std::task::{Context, Poll};
10
use tokio_stream::wrappers::IntervalStream;
11
use tokio_stream::Stream;
12
use tracing::instrument;
13

14
use crate::sys_input;
15
use crate::sys_input::Cmd;
16
use crate::sys_input::ProcessStream;
17
use crate::sys_input::SysInputApi;
18
use crate::time_wrapper::Duration;
19
use crate::time_wrapper::Instant;
20

21
#[cfg(test)]
22
use crate::sys_input::SysInputVirtual;
23

24
// InputData, InputItem //////////////////////////////////////////////////////
25

26
#[derive(Debug, Clone, PartialEq, Eq)]
27
pub enum InputData {
28
    Start,
29
    LineOut(String),
30
    LineErr(String),
31
    Done(ExitStatus),
32
    Err(std::io::ErrorKind),
33
    RunTick,
34
    SleepTick,
35
}
36

37
impl From<sys_input::Item> for InputData {
38
    fn from(item: sys_input::Item) -> Self {
3✔
39
        match item {
1✔
40
            sys_input::Item::Stdout(l) => InputData::LineOut(l),
1✔
41
            sys_input::Item::Stderr(l) => InputData::LineErr(l),
1✔
42
            sys_input::Item::Done(Ok(sts)) => InputData::Done(sts),
1✔
UNCOV
43
            sys_input::Item::Done(Err(e)) => InputData::Err(e),
×
44
        }
45
    }
3✔
46
}
47

48
impl From<std::io::ErrorKind> for InputData {
49
    fn from(e: std::io::ErrorKind) -> Self {
1✔
50
        InputData::Err(e)
1✔
51
    }
1✔
52
}
53

54
impl From<std::io::Error> for InputData {
55
    fn from(e: std::io::Error) -> Self {
×
NEW
56
        e.kind().into()
×
57
    }
×
58
}
59

60
#[derive(Debug, Clone, PartialEq, Eq)]
61
pub struct InputItem {
62
    pub time: Instant,
63
    pub data: InputData,
64
}
65

66
impl InputItem {
67
    pub fn new<D>(time: Instant, data: D) -> InputItem
6✔
68
    where
6✔
69
        D: Into<InputData>,
6✔
70
    {
6✔
71
        Self {
6✔
72
            time,
6✔
73
            data: data.into(),
6✔
74
        }
6✔
75
    }
6✔
76
}
77

78
// InputStream ///////////////////////////////////////////////////////////////
79

80
#[derive(Debug, Default)]
81
enum State {
82
    /// State where we start the process on the next iteration.
83
    #[default]
84
    Start,
85
    /// The process is running and we are yielding lines and ticks.
86
    Running {
87
        /// Events coming from the running process
88
        process: ProcessStream,
89
        /// Tick events generated by the [`IntervalStream`] timer
90
        ticker: IntervalStream,
91
    },
92
    /// Sleeping between two process executions, yielding ticks.
93
    Sleeping {
94
        /// When to wake up
95
        deadline: Instant,
96
        /// 1s ticker
97
        ticker: IntervalStream,
98
    },
99
    /// Don't execute the process again, either because of an exit
100
    /// condition or an error.
101
    Done,
102
}
103

104
#[pin_project]
8✔
105
#[derive(Default, Debug)]
106
pub struct InputStream<SI: SysInputApi> {
107
    sys_input: SI,
108
    cmd: Cmd,
109
    refresh: Duration,
110
    sleep: Duration,
111
    exit_on_success: bool,
112
    exit_on_failure: bool,
113
    state: State,
114
}
115

116
impl<SI: SysInputApi> InputStream<SI> {
NEW
117
    pub fn new(sys_input: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
×
NEW
118
        eprintln!("refresh {:?} sleep {:?}", refresh, sleep);
×
119
        Ok(Self {
×
120
            sys_input,
×
121
            cmd,
×
NEW
122
            refresh,
×
NEW
123
            sleep,
×
NEW
124
            exit_on_success: false,
×
NEW
125
            exit_on_failure: false,
×
NEW
126
            state: State::Start,
×
127
        })
×
128
    }
×
129

130
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
131
        let process = self.sys_input.run_command(self.cmd.clone())?;
2✔
132
        let ticker = IntervalStream::new(self.refresh.into());
2✔
133
        self.state = State::Running { process, ticker };
2✔
134
        Ok(())
2✔
135
    }
2✔
136

NEW
137
    fn sleep(&mut self, now: Instant) {
×
NEW
138
        let deadline = &now + &self.sleep;
×
NEW
139
        eprintln!("sleep deadline {} ticker for {:?}", deadline, self.sleep);
×
NEW
140
        let ticker = IntervalStream::new(Duration::seconds(1).into());
×
NEW
141
        self.state = State::Sleeping { deadline, ticker };
×
NEW
142
    }
×
143
}
144

145
#[cfg(test)]
146
impl InputStream<SysInputVirtual> {
147
    pub fn new_virtual(
2✔
148
        sys_input: SysInputVirtual,
2✔
149
        exit_on_success: bool,
2✔
150
        exit_on_failure: bool,
2✔
151
    ) -> Result<Self> {
2✔
152
        Ok(Self {
2✔
153
            sys_input,
2✔
154
            cmd: Cmd::default(),
2✔
155
            refresh: Duration::INFINITE,
2✔
156
            sleep: Duration::INFINITE,
2✔
157
            exit_on_success,
2✔
158
            exit_on_failure,
2✔
159
            state: State::Start,
2✔
160
        })
2✔
161
    }
2✔
162
}
163

164
impl<SI: SysInputApi> Stream for InputStream<SI> {
165
    type Item = InputItem;
166

167
    #[instrument(level = "debug", ret, skip(cx))]
168
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169
        let this = self.as_mut().project();
170
        let now = this.sys_input.now();
171
        let mut state = std::mem::take(&mut *this.state);
172
        return match state {
173
            State::Start => match self.run() {
174
                Ok(_) => Poll::Ready(Some(InputItem::new(now, InputData::Start))),
175
                Err(e) => Poll::Ready(Some(InputItem::new(now, e))),
176
            },
177
            State::Sleeping {
178
                deadline,
179
                ref mut ticker,
180
            } => {
181
                if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
182
                    if now >= deadline {
183
                        *this.state = State::Start;
184
                        Poll::Ready(Some(InputItem::new(now, InputData::SleepTick)))
185
                    } else {
186
                        *this.state = state;
187
                        Poll::Ready(Some(InputItem::new(now, InputData::SleepTick)))
188
                    }
189
                } else {
190
                    *this.state = state;
191
                    Poll::Pending
192
                }
193
            }
194
            State::Running {
195
                ref mut process,
196
                ref mut ticker,
197
            } => match Pin::new(process).poll_next(cx) {
198
                Poll::Ready(Some(item)) => match item {
199
                    sys_input::Item::Stdout(_) => {
200
                        *this.state = state;
201
                        Poll::Ready(Some(InputItem::new(now, item)))
202
                    }
203
                    sys_input::Item::Stderr(_) => {
204
                        *this.state = state;
205
                        Poll::Ready(Some(InputItem::new(now, item)))
206
                    }
207
                    sys_input::Item::Done(Ok(exitstatus)) => {
208
                        let success = exitstatus.success();
209
                        if success && *this.exit_on_success || !success && *this.exit_on_failure {
210
                            *this.state = State::Done;
211
                        } else {
212
                            self.sleep(now);
213
                        }
214
                        Poll::Ready(Some(InputItem::new(now, item)))
215
                    }
216
                    sys_input::Item::Done(Err(e)) => {
217
                        *this.state = State::Done;
218
                        Poll::Ready(Some(InputItem::new(now, e)))
219
                    }
220
                },
221
                Poll::Ready(None) => {
222
                    #[cfg(not(test))]
223
                    panic!("We should never see the underlying stream end");
224
                    #[cfg(test)]
225
                    {
226
                        *this.state = State::Done;
227
                        Poll::Ready(None)
228
                    }
229
                }
230
                Poll::Pending => {
231
                    // Process doesn't have an item, it must be the ticker
232
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
233
                        *this.state = state;
234
                        Poll::Ready(Some(InputItem::new(now, InputData::RunTick)))
235
                    } else {
236
                        *this.state = state;
237
                        Poll::Pending
238
                    }
239
                }
240
            },
241
            State::Done => {
242
                *this.state = state;
243
                Poll::Ready(None)
244
            }
245
        };
246
    }
247
}
248

249
// Tests /////////////////////////////////////////////////////////////////////
250

251
#[cfg(test)]
252
mod tests {
253
    use color_eyre::Result;
254
    use std::io;
255
    use tokio_stream::StreamExt;
256

257
    use crate::sys_input::Item;
258
    use crate::sys_input::SysInputVirtual;
259
    use crate::time_wrapper::Instant;
260

261
    use super::*;
262

263
    #[tokio::test]
264
    async fn test_basic_success() -> Result<()> {
1✔
265
        let list = vec![
1✔
266
            Item::Stdout("stdout".into()),
1✔
267
            Item::Stderr("stderr".into()),
1✔
268
            Item::Done(Ok(ExitStatus::default())),
1✔
269
        ];
1✔
270
        let mut sys = SysInputVirtual::default();
1✔
271
        sys.set_items(list.clone());
1✔
272
        let streamer = InputStream::new_virtual(sys, true, true)?;
1✔
273
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
274
        let mut now = Instant::default();
1✔
275
        assert_eq!(
1✔
276
            streamed,
1✔
277
            vec![
1✔
278
                InputItem {
1✔
279
                    time: now.incr(),
1✔
280
                    data: InputData::Start
1✔
281
                },
1✔
282
                InputItem {
1✔
283
                    time: now.incr(),
1✔
284
                    data: InputData::LineOut("stdout".to_owned())
1✔
285
                },
1✔
286
                InputItem {
1✔
287
                    time: now.incr(),
1✔
288
                    data: InputData::LineErr("stderr".to_owned())
1✔
289
                },
1✔
290
                InputItem {
1✔
291
                    time: now.incr(),
1✔
292
                    data: InputData::Done(ExitStatus::default())
1✔
293
                }
1✔
294
            ]
1✔
295
        );
1✔
296
        Ok(())
1✔
297
    }
1✔
298

299
    #[tokio::test]
300
    async fn test_done_err() -> Result<()> {
1✔
301
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
302
        let mut sys = SysInputVirtual::default();
1✔
303
        sys.set_items(list.clone());
1✔
304
        let streamer = InputStream::new_virtual(sys, false, false)?;
1✔
305
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
306
        let mut now = Instant::default();
1✔
307
        assert_eq!(
1✔
308
            streamed,
1✔
309
            vec![
1✔
310
                InputItem {
1✔
311
                    time: now.incr(),
1✔
312
                    data: InputData::Start,
1✔
313
                },
1✔
314
                InputItem {
1✔
315
                    time: now.incr(),
1✔
316
                    data: InputData::Err(io::ErrorKind::UnexpectedEof)
1✔
317
                }
1✔
318
            ]
1✔
319
        );
1✔
320
        Ok(())
1✔
321
    }
1✔
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc