• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 16154576757

08 Jul 2025 09:16PM UTC coverage: 62.319% (-0.6%) from 62.958%
16154576757

push

github

lpenz
Improve how sleeping and deadline are handled

- No longer keep RunTick and SleepTick, we only need Tick.
- Create a StartSleeping event that specifies the deadline.

2 of 14 new or added lines in 2 files covered. (14.29%)

2 existing lines in 2 files now uncovered.

430 of 690 relevant lines covered (62.32%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.31
/src/engine.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Main lower-level module that takes care of running the command and
6
//! yielding all possible events into a coherent stream of timestamped
7
//! events.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::pin::Pin;
12
use std::task::{Context, Poll};
13
use tokio_stream::Stream;
14
use tokio_stream::wrappers::IntervalStream;
15
use tracing::instrument;
16

17
use crate::process_wrapper;
18
use crate::process_wrapper::Cmd;
19
use crate::process_wrapper::ExitSts;
20
use crate::process_wrapper::ProcessStream;
21
use crate::sys::SysApi;
22
use crate::time_wrapper::Duration;
23
use crate::time_wrapper::Instant;
24
use crate::user_wrapper::UserStream;
25

26
// EData, EItem //////////////////////////////////////////////////////
27

28
#[derive(Debug, Clone, PartialEq, Eq)]
29
pub enum EData {
30
    StartRun,
31
    StartSleep(Instant),
32
    LineOut(String),
33
    LineErr(String),
34
    Done(ExitSts),
35
    Err(std::io::ErrorKind),
36
    Tick,
37
}
38

39
impl From<process_wrapper::Item> for EData {
40
    fn from(item: process_wrapper::Item) -> Self {
3✔
41
        match item {
1✔
42
            process_wrapper::Item::Stdout(l) => EData::LineOut(l),
1✔
43
            process_wrapper::Item::Stderr(l) => EData::LineErr(l),
1✔
44
            process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
1✔
45
            process_wrapper::Item::Done(Err(e)) => EData::Err(e),
×
46
        }
47
    }
3✔
48
}
49

50
impl From<std::io::ErrorKind> for EData {
51
    fn from(e: std::io::ErrorKind) -> Self {
1✔
52
        EData::Err(e)
1✔
53
    }
1✔
54
}
55

56
impl From<std::io::Error> for EData {
57
    fn from(e: std::io::Error) -> Self {
×
58
        e.kind().into()
×
59
    }
×
60
}
61

62
#[derive(Debug, Clone, PartialEq, Eq)]
63
pub struct EItem {
64
    pub time: Instant,
65
    pub data: EData,
66
}
67

68
impl EItem {
69
    pub fn new<D>(time: Instant, data: D) -> EItem
6✔
70
    where
6✔
71
        D: Into<EData>,
6✔
72
    {
73
        Self {
6✔
74
            time,
6✔
75
            data: data.into(),
6✔
76
        }
6✔
77
    }
6✔
78
}
79

80
// Engine ////////////////////////////////////////////////////////////
81

82
#[derive(Debug, Default)]
83
enum State {
84
    /// State where we start the process on the next iteration.
85
    #[default]
86
    Start,
87
    StartSleeping,
88
    /// The process is running and we are yielding lines and ticks.
89
    Running {
90
        /// Events coming from the running process
91
        process: ProcessStream,
92
        /// Tick events generated by the [`IntervalStream`] timer
93
        ticker: IntervalStream,
94
    },
95
    /// Sleeping between two process executions, yielding ticks.
96
    Sleeping {
97
        /// When to wake up
98
        deadline: Instant,
99
        /// 1s ticker
100
        ticker: IntervalStream,
101
    },
102
    /// Don't execute the process again, either because of an exit
103
    /// condition or an error.
104
    Done,
105
}
106

107
#[pin_project]
108
#[derive(Default, Debug)]
109
pub struct Engine<SI: SysApi> {
110
    sys: SI,
111
    cmd: Cmd,
112
    refresh: Duration,
113
    sleep: Duration,
114
    exit_on_success: bool,
115
    exit_on_failure: bool,
116
    state: State,
117
    user: Option<UserStream>,
118
    exit_by_user: bool,
119
}
120

121
impl<SI: SysApi> Engine<SI> {
122
    pub fn new(mut sys: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
×
123
        let user_stream = sys.user_stream();
×
124
        Ok(Self {
×
125
            sys,
×
126
            cmd,
×
127
            refresh,
×
128
            sleep,
×
129
            exit_on_success: false,
×
130
            exit_on_failure: false,
×
131
            state: State::Start,
×
132
            user: user_stream,
×
133
            exit_by_user: false,
×
134
        })
×
135
    }
×
136

137
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
138
        let process = self.sys.run_command(self.cmd.clone())?;
2✔
139
        let ticker = IntervalStream::new(self.refresh.into());
2✔
140
        self.state = State::Running { process, ticker };
2✔
141
        Ok(())
2✔
142
    }
2✔
143

NEW
144
    fn sleep(&mut self, now: Instant) -> EItem {
×
145
        let deadline = &now + &self.sleep;
×
146
        let ticker = IntervalStream::new(Duration::seconds(1).into());
×
147
        self.state = State::Sleeping { deadline, ticker };
×
NEW
148
        EItem::new(now, EData::StartSleep(deadline))
×
UNCOV
149
    }
×
150
}
151

152
impl<SI: SysApi> Stream for Engine<SI> {
153
    type Item = EItem;
154

155
    #[instrument(level = "debug", ret, skip(cx))]
156
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
157
        let this = self.as_mut().project();
158
        let now = this.sys.now();
159
        if let Some(user) = this.user {
160
            match Pin::new(user).poll_next(cx) {
161
                Poll::Ready(Some(_)) => {
162
                    *this.exit_by_user = true;
163
                    *this.user = None;
164
                }
165
                Poll::Ready(None) => {
166
                    *this.user = None;
167
                }
168
                Poll::Pending => {}
169
            };
170
        }
171
        let mut state = std::mem::take(&mut *this.state);
172
        return match state {
173
            State::Start => match self.run() {
174
                Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
175
                Err(e) => Poll::Ready(Some(EItem::new(now, e))),
176
            },
177
            State::StartSleeping => {
178
                let item = self.sleep(now);
179
                Poll::Ready(Some(item))
180
            }
181
            State::Sleeping {
182
                deadline,
183
                ref mut ticker,
184
            } => {
185
                if *this.exit_by_user {
186
                    *this.state = State::Done;
187
                    Poll::Ready(None)
188
                } else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
189
                    let tick = EData::Tick;
190
                    if now < deadline {
191
                        *this.state = state;
192
                        Poll::Ready(Some(EItem::new(now, tick)))
193
                    } else {
194
                        *this.state = State::Start;
195
                        Poll::Ready(Some(EItem::new(now, tick)))
196
                    }
197
                } else {
198
                    *this.state = state;
199
                    Poll::Pending
200
                }
201
            }
202
            State::Running {
203
                ref mut process,
204
                ref mut ticker,
205
            } => match Pin::new(process).poll_next(cx) {
206
                Poll::Ready(Some(item)) => match item {
207
                    process_wrapper::Item::Stdout(_) => {
208
                        *this.state = state;
209
                        Poll::Ready(Some(EItem::new(now, item)))
210
                    }
211
                    process_wrapper::Item::Stderr(_) => {
212
                        *this.state = state;
213
                        Poll::Ready(Some(EItem::new(now, item)))
214
                    }
215
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
216
                        let success = exitsts.success();
217
                        if *this.exit_by_user
218
                            || success && *this.exit_on_success
219
                            || !success && *this.exit_on_failure
220
                        {
221
                            *this.state = State::Done;
222
                        } else {
223
                            *this.state = State::StartSleeping;
224
                        }
225
                        Poll::Ready(Some(EItem::new(now, item)))
226
                    }
227
                    process_wrapper::Item::Done(Err(e)) => {
228
                        *this.state = State::Done;
229
                        Poll::Ready(Some(EItem::new(now, e)))
230
                    }
231
                },
232
                Poll::Ready(None) => {
233
                    #[cfg(not(test))]
234
                    panic!("We should never see the underlying stream end");
235
                    #[cfg(test)]
236
                    {
237
                        *this.state = State::Done;
238
                        Poll::Ready(None)
239
                    }
240
                }
241
                Poll::Pending => {
242
                    // Process doesn't have an item, it must be the ticker
243
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
244
                        *this.state = state;
245
                        Poll::Ready(Some(EItem::new(now, EData::Tick)))
246
                    } else {
247
                        *this.state = state;
248
                        Poll::Pending
249
                    }
250
                }
251
            },
252
            State::Done => {
253
                *this.state = state;
254
                Poll::Ready(None)
255
            }
256
        };
257
    }
8✔
258
}
259

260
// Tests /////////////////////////////////////////////////////////////
261

262
#[cfg(test)]
263
mod tests {
264
    use color_eyre::Result;
265
    use std::io;
266
    use tokio_stream::StreamExt;
267

268
    use crate::process_wrapper::Item;
269
    use crate::sys::SysVirtual;
270
    use crate::time_wrapper::Instant;
271

272
    use super::*;
273

274
    impl Engine<SysVirtual> {
275
        pub fn new_virtual(
2✔
276
            mut sys: SysVirtual,
2✔
277
            exit_on_success: bool,
2✔
278
            exit_on_failure: bool,
2✔
279
        ) -> Result<Self> {
2✔
280
            let user_stream = sys.user_stream();
2✔
281
            Ok(Self {
2✔
282
                sys,
2✔
283
                cmd: Cmd::default(),
2✔
284
                refresh: Duration::INFINITE,
2✔
285
                sleep: Duration::INFINITE,
2✔
286
                exit_on_success,
2✔
287
                exit_on_failure,
2✔
288
                state: State::Start,
2✔
289
                user: user_stream,
2✔
290
                exit_by_user: false,
2✔
291
            })
2✔
292
        }
2✔
293
    }
294

295
    #[tokio::test]
296
    async fn test_basic_success() -> Result<()> {
1✔
297
        let list = vec![
1✔
298
            Item::Stdout("stdout".into()),
1✔
299
            Item::Stderr("stderr".into()),
1✔
300
            Item::Done(Ok(ExitSts::default())),
1✔
301
        ];
302
        let mut sys = SysVirtual::default();
1✔
303
        sys.set_items(list.clone());
1✔
304
        let streamer = Engine::new_virtual(sys, true, true)?;
1✔
305
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
306
        let mut now = Instant::default();
1✔
307
        assert_eq!(
1✔
308
            streamed,
309
            vec![
1✔
310
                EItem {
1✔
311
                    time: now.incr(),
1✔
312
                    data: EData::StartRun
1✔
313
                },
1✔
314
                EItem {
1✔
315
                    time: now.incr(),
1✔
316
                    data: EData::LineOut("stdout".to_owned())
1✔
317
                },
1✔
318
                EItem {
1✔
319
                    time: now.incr(),
1✔
320
                    data: EData::LineErr("stderr".to_owned())
1✔
321
                },
1✔
322
                EItem {
1✔
323
                    time: now.incr(),
1✔
324
                    data: EData::Done(ExitSts::default())
1✔
325
                }
1✔
326
            ]
327
        );
328
        Ok(())
2✔
329
    }
1✔
330

331
    #[tokio::test]
332
    async fn test_done_err() -> Result<()> {
1✔
333
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
334
        let mut sys = SysVirtual::default();
1✔
335
        sys.set_items(list.clone());
1✔
336
        let streamer = Engine::new_virtual(sys, false, false)?;
1✔
337
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
338
        let mut now = Instant::default();
1✔
339
        assert_eq!(
1✔
340
            streamed,
341
            vec![
1✔
342
                EItem {
1✔
343
                    time: now.incr(),
1✔
344
                    data: EData::StartRun,
1✔
345
                },
1✔
346
                EItem {
1✔
347
                    time: now.incr(),
1✔
348
                    data: EData::Err(io::ErrorKind::UnexpectedEof)
1✔
349
                }
1✔
350
            ]
351
        );
352
        Ok(())
2✔
353
    }
1✔
354
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc