• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 16728891831

04 Aug 2025 04:36PM UTC coverage: 62.568% (+1.4%) from 61.166%
16728891831

push

github

lpenz
Cargo.*: increment version to 2.1.10

458 of 732 relevant lines covered (62.57%)

1.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.8
/src/engine.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Main lower-level module that takes care of running the command and
6
//! yielding all possible events into a coherent stream of timestamped
7
//! events.
8

9
use color_eyre::Result;
10
use pin_project::pin_project;
11
use std::pin::Pin;
12
use std::task::{Context, Poll};
13
use tokio_stream::Stream;
14
use tokio_stream::wrappers::IntervalStream;
15
use tracing::instrument;
16

17
use crate::process_wrapper;
18
use crate::process_wrapper::Cmd;
19
use crate::process_wrapper::ExitSts;
20
use crate::process_wrapper::ProcessStream;
21
use crate::sys::SysApi;
22
use crate::time_wrapper::Duration;
23
use crate::time_wrapper::Instant;
24
use crate::user_wrapper::UserStream;
25

26
// EData, EItem //////////////////////////////////////////////////////
27

28
#[derive(Debug, Clone, PartialEq, Eq)]
29
pub enum EData {
30
    StartRun,
31
    StartSleep(Instant),
32
    LineOut(String),
33
    LineErr(String),
34
    Done(ExitSts),
35
    Err(std::io::ErrorKind),
36
    Tick,
37
}
38

39
impl From<process_wrapper::Item> for EData {
40
    fn from(item: process_wrapper::Item) -> Self {
3✔
41
        match item {
1✔
42
            process_wrapper::Item::Stdout(l) => EData::LineOut(l),
1✔
43
            process_wrapper::Item::Stderr(l) => EData::LineErr(l),
1✔
44
            process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
1✔
45
            process_wrapper::Item::Done(Err(e)) => EData::Err(e),
×
46
        }
47
    }
3✔
48
}
49

50
impl From<std::io::ErrorKind> for EData {
51
    fn from(e: std::io::ErrorKind) -> Self {
1✔
52
        EData::Err(e)
1✔
53
    }
1✔
54
}
55

56
impl From<std::io::Error> for EData {
57
    fn from(e: std::io::Error) -> Self {
×
58
        e.kind().into()
×
59
    }
×
60
}
61

62
#[derive(Debug, Clone, PartialEq, Eq)]
63
pub struct EItem {
64
    pub time: Instant,
65
    pub data: EData,
66
}
67

68
impl EItem {
69
    pub fn new<D>(time: Instant, data: D) -> EItem
6✔
70
    where
6✔
71
        D: Into<EData>,
6✔
72
    {
73
        Self {
6✔
74
            time,
6✔
75
            data: data.into(),
6✔
76
        }
6✔
77
    }
6✔
78
}
79

80
// Engine ////////////////////////////////////////////////////////////
81

82
#[derive(Debug, Default)]
83
enum State {
84
    /// State where we start the process on the next iteration.
85
    #[default]
86
    Start,
87
    StartSleeping,
88
    /// The process is running and we are yielding lines and ticks.
89
    Running {
90
        /// Events coming from the running process
91
        process: ProcessStream,
92
        /// Tick events generated by the [`IntervalStream`] timer
93
        ticker: IntervalStream,
94
    },
95
    /// Sleeping between two process executions, yielding ticks.
96
    Sleeping {
97
        /// When to wake up
98
        deadline: Instant,
99
        /// 1s ticker
100
        ticker: IntervalStream,
101
    },
102
    /// Don't execute the process again, either because of an exit
103
    /// condition or an error.
104
    Done,
105
}
106

107
#[pin_project]
108
#[derive(Default, Debug)]
109
pub struct Engine<SI: SysApi> {
110
    sys: SI,
111
    cmd: Cmd,
112
    refresh: Duration,
113
    sleep: Duration,
114
    exit_on_success: bool,
115
    exit_on_failure: bool,
116
    state: State,
117
    user: Option<UserStream>,
118
    exit_by_user: bool,
119
}
120

121
impl<SI: SysApi> Engine<SI> {
122
    pub fn new(
×
123
        mut sys: SI,
×
124
        cmd: Cmd,
×
125
        refresh: Duration,
×
126
        sleep: Duration,
×
127
        exit_on_success: bool,
×
128
        exit_on_failure: bool,
×
129
    ) -> Result<Self> {
×
130
        let user_stream = sys.user_stream();
×
131
        Ok(Self {
×
132
            sys,
×
133
            cmd,
×
134
            refresh,
×
135
            sleep,
×
136
            exit_on_success,
×
137
            exit_on_failure,
×
138
            state: State::Start,
×
139
            user: user_stream,
×
140
            exit_by_user: false,
×
141
        })
×
142
    }
×
143

144
    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
2✔
145
        let process = self.sys.run_command(self.cmd.clone())?;
2✔
146
        let ticker = IntervalStream::new(self.refresh.into());
2✔
147
        self.state = State::Running { process, ticker };
2✔
148
        Ok(())
2✔
149
    }
2✔
150

151
    fn sleep(&mut self, now: Instant) -> EItem {
×
152
        let deadline = &now + &self.sleep;
×
153
        let ticker = IntervalStream::new(Duration::seconds(1).into());
×
154
        self.state = State::Sleeping { deadline, ticker };
×
155
        EItem::new(now, EData::StartSleep(deadline))
×
156
    }
×
157
}
158

159
impl<SI: SysApi> Stream for Engine<SI> {
160
    type Item = EItem;
161

162
    #[instrument(level = "debug", ret, skip(cx))]
163
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8✔
164
        let this = self.as_mut().project();
165
        let now = this.sys.now();
166
        if let Some(user) = this.user {
167
            match Pin::new(user).poll_next(cx) {
168
                Poll::Ready(Some(s)) if s == "q" => {
169
                    *this.exit_by_user = true;
170
                    *this.user = None;
171
                }
172
                Poll::Ready(Some(s)) if s == "k" => {
173
                    if let State::Running { process, ticker: _ } = this.state {
174
                        if let Some(child) = process.child_mut() {
175
                            let _ = child.start_kill();
176
                        }
177
                    }
178
                    *this.exit_by_user = true;
179
                    *this.user = None;
180
                }
181
                Poll::Ready(Some(_)) => {}
182
                Poll::Ready(None) => {
183
                    *this.user = None;
184
                }
185
                Poll::Pending => {}
186
            };
187
        }
188
        let mut state = std::mem::take(&mut *this.state);
189
        return match state {
190
            State::Start => match self.run() {
191
                Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
192
                Err(e) => Poll::Ready(Some(EItem::new(now, e))),
193
            },
194
            State::StartSleeping => {
195
                let item = self.sleep(now);
196
                Poll::Ready(Some(item))
197
            }
198
            State::Sleeping {
199
                deadline,
200
                ref mut ticker,
201
            } => {
202
                if *this.exit_by_user {
203
                    *this.state = State::Done;
204
                    Poll::Ready(None)
205
                } else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
206
                    let tick = EData::Tick;
207
                    if now < deadline {
208
                        *this.state = state;
209
                        Poll::Ready(Some(EItem::new(now, tick)))
210
                    } else {
211
                        *this.state = State::Start;
212
                        Poll::Ready(Some(EItem::new(now, tick)))
213
                    }
214
                } else {
215
                    *this.state = state;
216
                    Poll::Pending
217
                }
218
            }
219
            State::Running {
220
                ref mut process,
221
                ref mut ticker,
222
            } => match Pin::new(process).poll_next(cx) {
223
                Poll::Ready(Some(item)) => match item {
224
                    process_wrapper::Item::Stdout(_) => {
225
                        *this.state = state;
226
                        Poll::Ready(Some(EItem::new(now, item)))
227
                    }
228
                    process_wrapper::Item::Stderr(_) => {
229
                        *this.state = state;
230
                        Poll::Ready(Some(EItem::new(now, item)))
231
                    }
232
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
233
                        let success = exitsts.success();
234
                        if *this.exit_by_user
235
                            || success && *this.exit_on_success
236
                            || !success && *this.exit_on_failure
237
                        {
238
                            *this.state = State::Done;
239
                        } else {
240
                            *this.state = State::StartSleeping;
241
                        }
242
                        Poll::Ready(Some(EItem::new(now, item)))
243
                    }
244
                    process_wrapper::Item::Done(Err(e)) => {
245
                        *this.state = State::Done;
246
                        Poll::Ready(Some(EItem::new(now, e)))
247
                    }
248
                },
249
                Poll::Ready(None) => {
250
                    #[cfg(not(test))]
251
                    panic!("We should never see the underlying stream end");
252
                    #[cfg(test)]
253
                    {
254
                        *this.state = State::Done;
255
                        Poll::Ready(None)
256
                    }
257
                }
258
                Poll::Pending => {
259
                    // Process doesn't have an item, it must be the ticker
260
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
261
                        *this.state = state;
262
                        Poll::Ready(Some(EItem::new(now, EData::Tick)))
263
                    } else {
264
                        *this.state = state;
265
                        Poll::Pending
266
                    }
267
                }
268
            },
269
            State::Done => {
270
                *this.state = state;
271
                Poll::Ready(None)
272
            }
273
        };
274
    }
8✔
275
}
276

277
// Tests /////////////////////////////////////////////////////////////
278

279
#[cfg(test)]
280
mod tests {
281
    use color_eyre::Result;
282
    use std::io;
283
    use tokio_stream::StreamExt;
284

285
    use crate::process_wrapper::Item;
286
    use crate::sys::SysVirtual;
287
    use crate::time_wrapper::Instant;
288

289
    use super::*;
290

291
    impl Engine<SysVirtual> {
292
        pub fn new_virtual(
2✔
293
            mut sys: SysVirtual,
2✔
294
            exit_on_success: bool,
2✔
295
            exit_on_failure: bool,
2✔
296
        ) -> Result<Self> {
2✔
297
            let user_stream = sys.user_stream();
2✔
298
            Ok(Self {
2✔
299
                sys,
2✔
300
                cmd: Cmd::default(),
2✔
301
                refresh: Duration::INFINITE,
2✔
302
                sleep: Duration::INFINITE,
2✔
303
                exit_on_success,
2✔
304
                exit_on_failure,
2✔
305
                state: State::Start,
2✔
306
                user: user_stream,
2✔
307
                exit_by_user: false,
2✔
308
            })
2✔
309
        }
2✔
310
    }
311

312
    #[tokio::test]
313
    async fn test_basic_success() -> Result<()> {
1✔
314
        let list = vec![
1✔
315
            Item::Stdout("stdout".into()),
1✔
316
            Item::Stderr("stderr".into()),
1✔
317
            Item::Done(Ok(ExitSts::default())),
1✔
318
        ];
319
        let mut sys = SysVirtual::default();
1✔
320
        sys.set_items(list.clone());
1✔
321
        let streamer = Engine::new_virtual(sys, true, true)?;
1✔
322
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
323
        let mut now = Instant::default();
1✔
324
        assert_eq!(
1✔
325
            streamed,
326
            vec![
1✔
327
                EItem {
1✔
328
                    time: now.incr(),
1✔
329
                    data: EData::StartRun
1✔
330
                },
1✔
331
                EItem {
1✔
332
                    time: now.incr(),
1✔
333
                    data: EData::LineOut("stdout".to_owned())
1✔
334
                },
1✔
335
                EItem {
1✔
336
                    time: now.incr(),
1✔
337
                    data: EData::LineErr("stderr".to_owned())
1✔
338
                },
1✔
339
                EItem {
1✔
340
                    time: now.incr(),
1✔
341
                    data: EData::Done(ExitSts::default())
1✔
342
                }
1✔
343
            ]
344
        );
345
        Ok(())
2✔
346
    }
1✔
347

348
    #[tokio::test]
349
    async fn test_done_err() -> Result<()> {
1✔
350
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
351
        let mut sys = SysVirtual::default();
1✔
352
        sys.set_items(list.clone());
1✔
353
        let streamer = Engine::new_virtual(sys, false, false)?;
1✔
354
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
355
        let mut now = Instant::default();
1✔
356
        assert_eq!(
1✔
357
            streamed,
358
            vec![
1✔
359
                EItem {
1✔
360
                    time: now.incr(),
1✔
361
                    data: EData::StartRun,
1✔
362
                },
1✔
363
                EItem {
1✔
364
                    time: now.incr(),
1✔
365
                    data: EData::Err(io::ErrorKind::UnexpectedEof)
1✔
366
                }
1✔
367
            ]
368
        );
369
        Ok(())
2✔
370
    }
1✔
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc