• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 21967796932

12 Feb 2026 10:59PM UTC coverage: 60.289% (+1.9%) from 58.407%
21967796932

push

github

lpenz
Cargo.*: increment version to 2.3.3

501 of 831 relevant lines covered (60.29%)

1.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.15
/src/engine.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Main lower-level module that takes care of running the command and
6
//! yielding all possible events into a coherent stream of timestamped
7
//! events.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::pin::Pin;
12
use std::task::{Context, Poll};
13
use tokio_stream::Stream;
14
use tokio_stream::wrappers::IntervalStream;
15
use tracing::instrument;
16

17
use crate::process_wrapper;
18
use crate::process_wrapper::Cmd;
19
use crate::process_wrapper::ExitSts;
20
use crate::process_wrapper::ProcessStream;
21
use crate::sys::SysApi;
22
use crate::time_wrapper::Duration;
23
use crate::time_wrapper::Instant;
24
use crate::user_wrapper::UserEvent;
25
use crate::user_wrapper::UserStream;
26

27
// EData, EItem //////////////////////////////////////////////////////
28

29
#[derive(Debug, Clone, PartialEq, Eq)]
30
pub enum EData {
31
    StartRun,
32
    StartSleep(Instant),
33
    LineOut(String),
34
    LineErr(String),
35
    Msg(String),
36
    Done(ExitSts),
37
    Err(std::io::ErrorKind),
38
    Tick,
39
}
40

41
impl From<process_wrapper::Item> for EData {
42
    fn from(item: process_wrapper::Item) -> Self {
3✔
43
        match item {
1✔
44
            process_wrapper::Item::Stdout(l) => EData::LineOut(l),
1✔
45
            process_wrapper::Item::Stderr(l) => EData::LineErr(l),
1✔
46
            process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
1✔
47
            process_wrapper::Item::Done(Err(e)) => EData::Err(e),
×
48
        }
49
    }
3✔
50
}
51

52
impl From<String> for EData {
53
    fn from(s: String) -> Self {
×
54
        EData::LineOut(s)
×
55
    }
×
56
}
57

58
impl From<std::io::ErrorKind> for EData {
59
    fn from(e: std::io::ErrorKind) -> Self {
1✔
60
        EData::Err(e)
1✔
61
    }
1✔
62
}
63

64
impl From<std::io::Error> for EData {
65
    fn from(e: std::io::Error) -> Self {
×
66
        e.kind().into()
×
67
    }
×
68
}
69

70
#[derive(Debug, Clone, PartialEq, Eq)]
71
pub struct EItem {
72
    pub time: Instant,
73
    pub data: EData,
74
}
75

76
impl EItem {
77
    pub fn new<D>(time: Instant, data: D) -> EItem
6✔
78
    where
6✔
79
        D: Into<EData>,
6✔
80
    {
81
        Self {
6✔
82
            time,
6✔
83
            data: data.into(),
6✔
84
        }
6✔
85
    }
6✔
86

87
    pub fn msg(time: Instant, msg: String) -> EItem {
×
88
        Self {
×
89
            time,
×
90
            data: EData::Msg(msg),
×
91
        }
×
92
    }
×
93
}
94

95
// Engine ////////////////////////////////////////////////////////////
96

97
#[derive(Debug, Default)]
98
enum State {
99
    /// State where we start the process on the next iteration.
100
    #[default]
101
    Start,
102
    StartSleeping,
103
    /// The process is running and we are yielding lines and ticks.
104
    Running {
105
        /// Events coming from the running process
106
        process: ProcessStream,
107
        /// Tick events generated by the [`IntervalStream`] timer
108
        ticker: IntervalStream,
109
    },
110
    /// Sleeping between two process executions, yielding ticks.
111
    Sleeping {
112
        /// When to wake up
113
        deadline: Instant,
114
        /// 1s ticker
115
        ticker: IntervalStream,
116
    },
117
    /// Don't execute the process again, either because of an exit
118
    /// condition or an error.
119
    Done,
120
}
121

122
impl State {
123
    pub fn is_running(&self) -> bool {
×
124
        matches!(self, State::Running { .. })
×
125
    }
×
126
}
127

128
#[pin_project]
129
#[derive(Default, Debug)]
130
pub struct Engine<SI: SysApi> {
131
    sys: SI,
132
    cmd: Cmd,
133
    refresh: Duration,
134
    sleep: Duration,
135
    exit_on_success: bool,
136
    exit_on_failure: bool,
137
    state: State,
138
    user: Option<UserStream>,
139
    exit_by_user: bool,
140
}
141

142
impl<SI: SysApi> Engine<SI> {
143
    pub fn new(
×
144
        mut sys: SI,
×
145
        cmd: Cmd,
×
146
        refresh: Duration,
×
147
        sleep: Duration,
×
148
        exit_on_success: bool,
×
149
        exit_on_failure: bool,
×
150
    ) -> Result<Self> {
×
151
        let user_stream = sys.user_stream();
×
152
        Ok(Self {
×
153
            sys,
×
154
            cmd,
×
155
            refresh,
×
156
            sleep,
×
157
            exit_on_success,
×
158
            exit_on_failure,
×
159
            state: State::Start,
×
160
            user: user_stream,
×
161
            exit_by_user: false,
×
162
        })
×
163
    }
×
164

165
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
166
        let process = self.sys.run_command(self.cmd.clone())?;
2✔
167
        let ticker = IntervalStream::new(self.refresh.into());
2✔
168
        self.state = State::Running { process, ticker };
2✔
169
        Ok(())
2✔
170
    }
2✔
171

172
    fn sleep(&mut self, now: Instant) -> EItem {
×
173
        let deadline = &now + &self.sleep;
×
174
        let ticker = IntervalStream::new(self.refresh.into());
×
175
        self.state = State::Sleeping { deadline, ticker };
×
176
        EItem::new(now, EData::StartSleep(deadline))
×
177
    }
×
178
}
179

180
impl<SI: SysApi> Stream for Engine<SI> {
181
    type Item = EItem;
182

183
    #[instrument(level = "debug", ret, skip(cx))]
184
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
185
        let this = self.as_mut().project();
186
        let now = this.sys.now();
187
        if let Some(user) = this.user {
188
            match Pin::new(user).poll_next(cx) {
189
                Poll::Ready(Some(UserEvent::Quit)) => {
190
                    if !*this.exit_by_user {
191
                        *this.exit_by_user = true;
192
                        return if this.state.is_running() {
193
                            Poll::Ready(Some(EItem::msg(now, "user exit, graceful".to_string())))
194
                        } else {
195
                            Poll::Ready(Some(EItem::msg(now, "user exit".to_string())))
196
                        };
197
                    }
198
                }
199
                Poll::Ready(Some(UserEvent::Kill)) => {
200
                    *this.exit_by_user = true;
201
                    if let State::Running { process, ticker: _ } = this.state
202
                        && let Some(child) = process.child_mut()
203
                    {
204
                        let _ = child.start_kill();
205
                        return Poll::Ready(Some(EItem::msg(now, "user exit, forced".to_string())));
206
                    } else {
207
                        return Poll::Ready(Some(EItem::msg(now, "user exit".to_string())));
208
                    }
209
                }
210
                Poll::Ready(None) => {
211
                    *this.user = None;
212
                }
213
                Poll::Pending => {}
214
            };
215
        }
216
        let mut state = std::mem::take(&mut *this.state);
217
        return match state {
218
            State::Start => match self.run() {
219
                Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
220
                Err(e) => Poll::Ready(Some(EItem::new(now, e))),
221
            },
222
            State::StartSleeping => {
223
                let item = self.sleep(now);
224
                Poll::Ready(Some(item))
225
            }
226
            State::Sleeping {
227
                deadline,
228
                ref mut ticker,
229
            } => {
230
                if *this.exit_by_user {
231
                    *this.state = State::Done;
232
                    Poll::Ready(None)
233
                } else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
234
                    let tick = EData::Tick;
235
                    if now < deadline {
236
                        *this.state = state;
237
                        Poll::Ready(Some(EItem::new(now, tick)))
238
                    } else {
239
                        *this.state = State::Start;
240
                        Poll::Ready(Some(EItem::new(now, tick)))
241
                    }
242
                } else {
243
                    *this.state = state;
244
                    Poll::Pending
245
                }
246
            }
247
            State::Running {
248
                ref mut process,
249
                ref mut ticker,
250
            } => match Pin::new(process).poll_next(cx) {
251
                Poll::Ready(Some(item)) => match item {
252
                    process_wrapper::Item::Stdout(_) => {
253
                        *this.state = state;
254
                        Poll::Ready(Some(EItem::new(now, item)))
255
                    }
256
                    process_wrapper::Item::Stderr(_) => {
257
                        *this.state = state;
258
                        Poll::Ready(Some(EItem::new(now, item)))
259
                    }
260
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
261
                        let success = exitsts.success();
262
                        if *this.exit_by_user
263
                            || success && *this.exit_on_success
264
                            || !success && *this.exit_on_failure
265
                        {
266
                            *this.state = State::Done;
267
                        } else {
268
                            *this.state = State::StartSleeping;
269
                        }
270
                        Poll::Ready(Some(EItem::new(now, item)))
271
                    }
272
                    process_wrapper::Item::Done(Err(e)) => {
273
                        *this.state = State::Done;
274
                        Poll::Ready(Some(EItem::new(now, e)))
275
                    }
276
                },
277
                Poll::Ready(None) => {
278
                    #[cfg(not(test))]
279
                    panic!("We should never see the underlying stream end");
280
                    #[cfg(test)]
281
                    {
282
                        *this.state = State::Done;
283
                        Poll::Ready(None)
284
                    }
285
                }
286
                Poll::Pending => {
287
                    // Process doesn't have an item, it must be the ticker
288
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
289
                        *this.state = state;
290
                        Poll::Ready(Some(EItem::new(now, EData::Tick)))
291
                    } else {
292
                        *this.state = state;
293
                        Poll::Pending
294
                    }
295
                }
296
            },
297
            State::Done => {
298
                *this.state = state;
299
                Poll::Ready(None)
300
            }
301
        };
302
    }
8✔
303
}
304

305
// Tests /////////////////////////////////////////////////////////////
306

307
#[cfg(test)]
308
mod tests {
309
    use color_eyre::Result;
310
    use std::io;
311
    use tokio_stream::StreamExt;
312

313
    use crate::process_wrapper::Item;
314
    use crate::sys::SysVirtual;
315
    use crate::time_wrapper::Instant;
316

317
    use super::*;
318

319
    impl Engine<SysVirtual> {
320
        pub fn new_virtual(
2✔
321
            mut sys: SysVirtual,
2✔
322
            exit_on_success: bool,
2✔
323
            exit_on_failure: bool,
2✔
324
        ) -> Result<Self> {
2✔
325
            let user_stream = sys.user_stream();
2✔
326
            Ok(Self {
2✔
327
                sys,
2✔
328
                cmd: Cmd::default(),
2✔
329
                refresh: Duration::INFINITE,
2✔
330
                sleep: Duration::INFINITE,
2✔
331
                exit_on_success,
2✔
332
                exit_on_failure,
2✔
333
                state: State::Start,
2✔
334
                user: user_stream,
2✔
335
                exit_by_user: false,
2✔
336
            })
2✔
337
        }
2✔
338
    }
339

340
    #[tokio::test]
341
    async fn test_basic_success() -> Result<()> {
1✔
342
        let list = vec![
1✔
343
            Item::Stdout("stdout".into()),
1✔
344
            Item::Stderr("stderr".into()),
1✔
345
            Item::Done(Ok(ExitSts::default())),
1✔
346
        ];
347
        let mut sys = SysVirtual::default();
1✔
348
        sys.set_items(list.clone());
1✔
349
        let streamer = Engine::new_virtual(sys, true, true)?;
1✔
350
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
351
        let mut now = Instant::default();
1✔
352
        assert_eq!(
1✔
353
            streamed,
354
            vec![
1✔
355
                EItem {
1✔
356
                    time: now.incr(),
1✔
357
                    data: EData::StartRun
1✔
358
                },
1✔
359
                EItem {
1✔
360
                    time: now.incr(),
1✔
361
                    data: EData::LineOut("stdout".to_owned())
1✔
362
                },
1✔
363
                EItem {
1✔
364
                    time: now.incr(),
1✔
365
                    data: EData::LineErr("stderr".to_owned())
1✔
366
                },
1✔
367
                EItem {
1✔
368
                    time: now.incr(),
1✔
369
                    data: EData::Done(ExitSts::default())
1✔
370
                }
1✔
371
            ]
372
        );
373
        Ok(())
2✔
374
    }
1✔
375

376
    #[tokio::test]
377
    async fn test_done_err() -> Result<()> {
1✔
378
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
379
        let mut sys = SysVirtual::default();
1✔
380
        sys.set_items(list.clone());
1✔
381
        let streamer = Engine::new_virtual(sys, false, false)?;
1✔
382
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
383
        let mut now = Instant::default();
1✔
384
        assert_eq!(
1✔
385
            streamed,
386
            vec![
1✔
387
                EItem {
1✔
388
                    time: now.incr(),
1✔
389
                    data: EData::StartRun,
1✔
390
                },
1✔
391
                EItem {
1✔
392
                    time: now.incr(),
1✔
393
                    data: EData::Err(io::ErrorKind::UnexpectedEof)
1✔
394
                }
1✔
395
            ]
396
        );
397
        Ok(())
2✔
398
    }
1✔
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc