• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 21725554759

05 Feb 2026 06:59PM UTC coverage: 58.407% (-0.2%) from 58.629%
21725554759

push

github

lpenz
Cargo*: increment version to 2.3.2

462 of 791 relevant lines covered (58.41%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.02
/src/engine.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Main lower-level module that takes care of running the command and
6
//! yielding all possible events into a coherent stream of timestamped
7
//! events.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::pin::Pin;
12
use std::task::{Context, Poll};
13
use tokio_stream::Stream;
14
use tokio_stream::wrappers::IntervalStream;
15
use tracing::instrument;
16

17
use crate::process_wrapper;
18
use crate::process_wrapper::Cmd;
19
use crate::process_wrapper::ExitSts;
20
use crate::process_wrapper::ProcessStream;
21
use crate::sys::SysApi;
22
use crate::time_wrapper::Duration;
23
use crate::time_wrapper::Instant;
24
use crate::user_wrapper::UserEvent;
25
use crate::user_wrapper::UserStream;
26

27
// EData, EItem //////////////////////////////////////////////////////
28

29
#[derive(Debug, Clone, PartialEq, Eq)]
30
pub enum EData {
31
    StartRun,
32
    StartSleep(Instant),
33
    LineOut(String),
34
    LineErr(String),
35
    Done(ExitSts),
36
    Err(std::io::ErrorKind),
37
    Tick,
38
}
39

40
impl From<process_wrapper::Item> for EData {
41
    fn from(item: process_wrapper::Item) -> Self {
3✔
42
        match item {
1✔
43
            process_wrapper::Item::Stdout(l) => EData::LineOut(l),
1✔
44
            process_wrapper::Item::Stderr(l) => EData::LineErr(l),
1✔
45
            process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
1✔
46
            process_wrapper::Item::Done(Err(e)) => EData::Err(e),
×
47
        }
48
    }
3✔
49
}
50

51
impl From<String> for EData {
52
    fn from(s: String) -> Self {
×
53
        EData::LineOut(s)
×
54
    }
×
55
}
56

57
impl From<std::io::ErrorKind> for EData {
58
    fn from(e: std::io::ErrorKind) -> Self {
1✔
59
        EData::Err(e)
1✔
60
    }
1✔
61
}
62

63
impl From<std::io::Error> for EData {
64
    fn from(e: std::io::Error) -> Self {
×
65
        e.kind().into()
×
66
    }
×
67
}
68

69
#[derive(Debug, Clone, PartialEq, Eq)]
70
pub struct EItem {
71
    pub time: Instant,
72
    pub data: EData,
73
}
74

75
impl EItem {
76
    pub fn new<D>(time: Instant, data: D) -> EItem
6✔
77
    where
6✔
78
        D: Into<EData>,
6✔
79
    {
80
        Self {
6✔
81
            time,
6✔
82
            data: data.into(),
6✔
83
        }
6✔
84
    }
6✔
85
}
86

87
// Engine ////////////////////////////////////////////////////////////
88

89
#[derive(Debug, Default)]
90
enum State {
91
    /// State where we start the process on the next iteration.
92
    #[default]
93
    Start,
94
    StartSleeping,
95
    /// The process is running and we are yielding lines and ticks.
96
    Running {
97
        /// Events coming from the running process
98
        process: ProcessStream,
99
        /// Tick events generated by the [`IntervalStream`] timer
100
        ticker: IntervalStream,
101
    },
102
    /// Sleeping between two process executions, yielding ticks.
103
    Sleeping {
104
        /// When to wake up
105
        deadline: Instant,
106
        /// 1s ticker
107
        ticker: IntervalStream,
108
    },
109
    /// Don't execute the process again, either because of an exit
110
    /// condition or an error.
111
    Done,
112
}
113

114
#[pin_project]
115
#[derive(Default, Debug)]
116
pub struct Engine<SI: SysApi> {
117
    sys: SI,
118
    cmd: Cmd,
119
    refresh: Duration,
120
    sleep: Duration,
121
    exit_on_success: bool,
122
    exit_on_failure: bool,
123
    state: State,
124
    user: Option<UserStream>,
125
    exit_by_user: bool,
126
}
127

128
impl<SI: SysApi> Engine<SI> {
129
    pub fn new(
×
130
        mut sys: SI,
×
131
        cmd: Cmd,
×
132
        refresh: Duration,
×
133
        sleep: Duration,
×
134
        exit_on_success: bool,
×
135
        exit_on_failure: bool,
×
136
    ) -> Result<Self> {
×
137
        let user_stream = sys.user_stream();
×
138
        Ok(Self {
×
139
            sys,
×
140
            cmd,
×
141
            refresh,
×
142
            sleep,
×
143
            exit_on_success,
×
144
            exit_on_failure,
×
145
            state: State::Start,
×
146
            user: user_stream,
×
147
            exit_by_user: false,
×
148
        })
×
149
    }
×
150

151
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
152
        let process = self.sys.run_command(self.cmd.clone())?;
2✔
153
        let ticker = IntervalStream::new(self.refresh.into());
2✔
154
        self.state = State::Running { process, ticker };
2✔
155
        Ok(())
2✔
156
    }
2✔
157

158
    fn sleep(&mut self, now: Instant) -> EItem {
×
159
        let deadline = &now + &self.sleep;
×
160
        let ticker = IntervalStream::new(self.refresh.into());
×
161
        self.state = State::Sleeping { deadline, ticker };
×
162
        EItem::new(now, EData::StartSleep(deadline))
×
163
    }
×
164
}
165

166
impl<SI: SysApi> Stream for Engine<SI> {
167
    type Item = EItem;
168

169
    #[instrument(level = "debug", ret, skip(cx))]
170
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
171
        let this = self.as_mut().project();
172
        let now = this.sys.now();
173
        if let Some(user) = this.user {
174
            match Pin::new(user).poll_next(cx) {
175
                Poll::Ready(Some(UserEvent::Quit)) => {
176
                    *this.exit_by_user = true;
177
                    return Poll::Ready(Some(EItem::new(now, ofmt!(&now, "user exit, graceful"))));
178
                }
179
                Poll::Ready(Some(UserEvent::Kill)) => {
180
                    if let State::Running { process, ticker: _ } = this.state
181
                        && let Some(child) = process.child_mut()
182
                    {
183
                        let _ = child.start_kill();
184
                    }
185
                    *this.exit_by_user = true;
186
                    return Poll::Ready(Some(EItem::new(now, ofmt!(&now, "user exit, forced"))));
187
                }
188
                Poll::Ready(None) => {
189
                    *this.user = None;
190
                }
191
                Poll::Pending => {}
192
            };
193
        }
194
        let mut state = std::mem::take(&mut *this.state);
195
        return match state {
196
            State::Start => match self.run() {
197
                Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
198
                Err(e) => Poll::Ready(Some(EItem::new(now, e))),
199
            },
200
            State::StartSleeping => {
201
                let item = self.sleep(now);
202
                Poll::Ready(Some(item))
203
            }
204
            State::Sleeping {
205
                deadline,
206
                ref mut ticker,
207
            } => {
208
                if *this.exit_by_user {
209
                    *this.state = State::Done;
210
                    Poll::Ready(None)
211
                } else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
212
                    let tick = EData::Tick;
213
                    if now < deadline {
214
                        *this.state = state;
215
                        Poll::Ready(Some(EItem::new(now, tick)))
216
                    } else {
217
                        *this.state = State::Start;
218
                        Poll::Ready(Some(EItem::new(now, tick)))
219
                    }
220
                } else {
221
                    *this.state = state;
222
                    Poll::Pending
223
                }
224
            }
225
            State::Running {
226
                ref mut process,
227
                ref mut ticker,
228
            } => match Pin::new(process).poll_next(cx) {
229
                Poll::Ready(Some(item)) => match item {
230
                    process_wrapper::Item::Stdout(_) => {
231
                        *this.state = state;
232
                        Poll::Ready(Some(EItem::new(now, item)))
233
                    }
234
                    process_wrapper::Item::Stderr(_) => {
235
                        *this.state = state;
236
                        Poll::Ready(Some(EItem::new(now, item)))
237
                    }
238
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
239
                        let success = exitsts.success();
240
                        if *this.exit_by_user
241
                            || success && *this.exit_on_success
242
                            || !success && *this.exit_on_failure
243
                        {
244
                            *this.state = State::Done;
245
                        } else {
246
                            *this.state = State::StartSleeping;
247
                        }
248
                        Poll::Ready(Some(EItem::new(now, item)))
249
                    }
250
                    process_wrapper::Item::Done(Err(e)) => {
251
                        *this.state = State::Done;
252
                        Poll::Ready(Some(EItem::new(now, e)))
253
                    }
254
                },
255
                Poll::Ready(None) => {
256
                    #[cfg(not(test))]
257
                    panic!("We should never see the underlying stream end");
258
                    #[cfg(test)]
259
                    {
260
                        *this.state = State::Done;
261
                        Poll::Ready(None)
262
                    }
263
                }
264
                Poll::Pending => {
265
                    // Process doesn't have an item, it must be the ticker
266
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
267
                        *this.state = state;
268
                        Poll::Ready(Some(EItem::new(now, EData::Tick)))
269
                    } else {
270
                        *this.state = state;
271
                        Poll::Pending
272
                    }
273
                }
274
            },
275
            State::Done => {
276
                *this.state = state;
277
                Poll::Ready(None)
278
            }
279
        };
280
    }
8✔
281
}
282

283
// Tests /////////////////////////////////////////////////////////////
284

285
#[cfg(test)]
286
mod tests {
287
    use color_eyre::Result;
288
    use std::io;
289
    use tokio_stream::StreamExt;
290

291
    use crate::process_wrapper::Item;
292
    use crate::sys::SysVirtual;
293
    use crate::time_wrapper::Instant;
294

295
    use super::*;
296

297
    impl Engine<SysVirtual> {
298
        pub fn new_virtual(
2✔
299
            mut sys: SysVirtual,
2✔
300
            exit_on_success: bool,
2✔
301
            exit_on_failure: bool,
2✔
302
        ) -> Result<Self> {
2✔
303
            let user_stream = sys.user_stream();
2✔
304
            Ok(Self {
2✔
305
                sys,
2✔
306
                cmd: Cmd::default(),
2✔
307
                refresh: Duration::INFINITE,
2✔
308
                sleep: Duration::INFINITE,
2✔
309
                exit_on_success,
2✔
310
                exit_on_failure,
2✔
311
                state: State::Start,
2✔
312
                user: user_stream,
2✔
313
                exit_by_user: false,
2✔
314
            })
2✔
315
        }
2✔
316
    }
317

318
    #[tokio::test]
319
    async fn test_basic_success() -> Result<()> {
1✔
320
        let list = vec![
1✔
321
            Item::Stdout("stdout".into()),
1✔
322
            Item::Stderr("stderr".into()),
1✔
323
            Item::Done(Ok(ExitSts::default())),
1✔
324
        ];
325
        let mut sys = SysVirtual::default();
1✔
326
        sys.set_items(list.clone());
1✔
327
        let streamer = Engine::new_virtual(sys, true, true)?;
1✔
328
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
329
        let mut now = Instant::default();
1✔
330
        assert_eq!(
1✔
331
            streamed,
332
            vec![
1✔
333
                EItem {
1✔
334
                    time: now.incr(),
1✔
335
                    data: EData::StartRun
1✔
336
                },
1✔
337
                EItem {
1✔
338
                    time: now.incr(),
1✔
339
                    data: EData::LineOut("stdout".to_owned())
1✔
340
                },
1✔
341
                EItem {
1✔
342
                    time: now.incr(),
1✔
343
                    data: EData::LineErr("stderr".to_owned())
1✔
344
                },
1✔
345
                EItem {
1✔
346
                    time: now.incr(),
1✔
347
                    data: EData::Done(ExitSts::default())
1✔
348
                }
1✔
349
            ]
350
        );
351
        Ok(())
2✔
352
    }
1✔
353

354
    #[tokio::test]
355
    async fn test_done_err() -> Result<()> {
1✔
356
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
357
        let mut sys = SysVirtual::default();
1✔
358
        sys.set_items(list.clone());
1✔
359
        let streamer = Engine::new_virtual(sys, false, false)?;
1✔
360
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
361
        let mut now = Instant::default();
1✔
362
        assert_eq!(
1✔
363
            streamed,
364
            vec![
1✔
365
                EItem {
1✔
366
                    time: now.incr(),
1✔
367
                    data: EData::StartRun,
1✔
368
                },
1✔
369
                EItem {
1✔
370
                    time: now.incr(),
1✔
371
                    data: EData::Err(io::ErrorKind::UnexpectedEof)
1✔
372
                }
1✔
373
            ]
374
        );
375
        Ok(())
2✔
376
    }
1✔
377
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc