• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 14693599884

27 Apr 2025 03:33PM UTC coverage: 70.713%. Remained the same
14693599884

push

github

lpenz
Add tracing.instrument to ProcessStream and InputStream

0 of 19 new or added lines in 2 files covered. (0.0%)

9 existing lines in 2 files now uncovered.

367 of 519 relevant lines covered (70.71%)

1.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.21
/src/input_stream.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use pin_project::pin_project;
7
use std::pin::Pin;
8
use std::process::ExitStatus;
9
use std::task::{Context, Poll};
10
use tokio_stream::wrappers::IntervalStream;
11
use tokio_stream::Stream;
12
use tracing::instrument;
13

14
use crate::sys_input;
15
use crate::sys_input::Cmd;
16
use crate::sys_input::ProcessStream;
17
use crate::sys_input::SysInputApi;
18
use crate::time_wrapper::Duration;
19
use crate::time_wrapper::Instant;
20

21
#[cfg(test)]
22
use crate::sys_input::SysInputVirtual;
23

24
#[derive(Debug, Clone, PartialEq, Eq)]
25
pub enum InputData {
26
    Start,
27
    LineOut(String),
28
    LineErr(String),
29
    Done(ExitStatus),
30
    Err(std::io::ErrorKind),
31
    Tick,
32
}
33

34
impl From<sys_input::Item> for InputData {
35
    fn from(item: sys_input::Item) -> Self {
4✔
36
        match item {
2✔
37
            sys_input::Item::Stdout(l) => InputData::LineOut(l),
1✔
38
            sys_input::Item::Stderr(l) => InputData::LineErr(l),
1✔
39
            sys_input::Item::Done(Ok(sts)) => InputData::Done(sts),
1✔
40
            sys_input::Item::Done(Err(e)) => InputData::Err(e),
1✔
41
        }
42
    }
4✔
43
}
44

45
impl From<tokio::time::Instant> for InputData {
UNCOV
46
    fn from(_: tokio::time::Instant) -> Self {
×
47
        InputData::Tick
×
48
    }
×
49
}
50

51
impl From<std::io::Error> for InputData {
UNCOV
52
    fn from(e: std::io::Error) -> Self {
×
53
        InputData::Err(e.kind())
×
54
    }
×
55
}
56

57
#[derive(Debug, Clone, PartialEq, Eq)]
58
pub struct InputItem {
59
    pub time: Instant,
60
    pub data: InputData,
61
}
62

63
impl InputItem {
64
    pub fn new<D>(time: Instant, data: D) -> InputItem
6✔
65
    where
6✔
66
        D: Into<InputData>,
6✔
67
    {
6✔
68
        Self {
6✔
69
            time,
6✔
70
            data: data.into(),
6✔
71
        }
6✔
72
    }
6✔
73
}
74

75
#[pin_project]
8✔
76
#[derive(Default)]
77
pub struct InputStream<SI: SysInputApi> {
78
    sys_input: SI,
79
    cmd: Cmd,
80
    // Events coming from the running process
81
    process: Option<ProcessStream>,
82
    // Tick events generated by the [`IntervalStream`] timer
83
    ticker: Option<IntervalStream>,
84
    done: bool,
85
}
86

87
impl<SI: SysInputApi> std::fmt::Debug for InputStream<SI> {
NEW
88
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
89
        let none = format_args!("None");
×
NEW
90
        let some = format_args!("Some");
×
NEW
91
        f.debug_struct("InputStream")
×
NEW
92
            .field("sys_input", &self.sys_input)
×
NEW
93
            .field("cmd", &self.cmd)
×
NEW
94
            .field(
×
NEW
95
                "process",
×
NEW
96
                if self.process.is_none() { &none } else { &some },
×
97
            )
NEW
98
            .field("ticker", if self.process.is_none() { &none } else { &some })
×
NEW
99
            .field("done", &self.done)
×
NEW
100
            .finish()
×
NEW
101
    }
×
102
}
103

104
impl<SI: SysInputApi> InputStream<SI> {
UNCOV
105
    pub fn new(sys_input: SI, cmd: Cmd, refresh_delay: Duration) -> Result<Self> {
×
106
        Ok(Self {
×
107
            sys_input,
×
108
            cmd,
×
109
            process: None,
×
110
            ticker: Some(IntervalStream::new(tokio::time::interval(
×
111
                refresh_delay.into(),
×
112
            ))),
×
113
            done: false,
×
114
        })
×
UNCOV
115
    }
×
116
}
117

118
#[cfg(test)]
119
impl InputStream<SysInputVirtual> {
120
    pub fn new_virtual(sys_input: SysInputVirtual) -> Result<Self> {
2✔
121
        Ok(Self {
2✔
122
            sys_input,
2✔
123
            cmd: Cmd::default(),
2✔
124
            process: None,
2✔
125
            ticker: None,
2✔
126
            done: false,
2✔
127
        })
2✔
128
    }
2✔
129
}
130

131
impl<SI: SysInputApi> Stream for InputStream<SI> {
132
    type Item = InputItem;
133

134
    #[instrument(level = "debug", ret, skip(cx))]
135
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136
        let this = self.project();
137
        let now = this.sys_input.now();
138
        if *this.done {
139
            // Trigger a new execution if we are called again.
140
            *this.process = None;
141
            *this.done = false;
142
            return Poll::Ready(None);
143
        }
144
        if this.process.is_none() {
145
            // Run the process if it's not running
146
            match this.sys_input.run_command(this.cmd.clone()) {
147
                Ok(process) => *this.process = Some(process),
148
                Err(e) => return Poll::Ready(Some(InputItem::new(now, e))),
149
            }
150
            return Poll::Ready(Some(InputItem::new(now, InputData::Start)));
151
        }
152
        if let Some(process) = this.process {
153
            match Pin::new(process).poll_next(cx) {
154
                Poll::Ready(Some(item)) => {
155
                    if matches!(item, sys_input::Item::Done(_)) {
156
                        // Trigger end of stream:
157
                        *this.done = true;
158
                    }
159
                    return Poll::Ready(Some(InputItem::new(now, item)));
160
                }
161
                Poll::Ready(None) => {
162
                    *this.process = None;
163
                    *this.ticker = None;
164
                }
165
                Poll::Pending => {}
166
            }
167
        }
168
        if let Some(ticker) = this.ticker {
169
            match Pin::new(ticker).poll_next(cx) {
170
                Poll::Ready(Some(item)) => Poll::Ready(Some(InputItem::new(now, item))),
171
                _ => Poll::Pending,
172
            }
173
        } else {
174
            Poll::Ready(None)
175
        }
176
    }
177
}
178

179
#[cfg(test)]
180
mod tests {
181
    use color_eyre::Result;
182
    use std::io;
183
    use tokio_stream::StreamExt;
184

185
    use crate::sys_input::Item;
186
    use crate::sys_input::SysInputVirtual;
187
    use crate::time_wrapper::Instant;
188

189
    use super::*;
190

191
    #[tokio::test]
192
    async fn test_basic_success() -> Result<()> {
1✔
193
        let list = vec![
1✔
194
            Item::Stdout("stdout".into()),
1✔
195
            Item::Stderr("stderr".into()),
1✔
196
            Item::Done(Ok(ExitStatus::default())),
1✔
197
        ];
1✔
198
        let mut sys = SysInputVirtual::default();
1✔
199
        sys.set_items(list.clone());
1✔
200
        let streamer = InputStream::new_virtual(sys)?;
1✔
201
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
202
        let mut now = Instant::default();
1✔
203
        assert_eq!(
1✔
204
            streamed,
1✔
205
            vec![
1✔
206
                InputItem {
1✔
207
                    time: now.incr(),
1✔
208
                    data: InputData::Start
1✔
209
                },
1✔
210
                InputItem {
1✔
211
                    time: now.incr(),
1✔
212
                    data: InputData::LineOut("stdout".to_owned())
1✔
213
                },
1✔
214
                InputItem {
1✔
215
                    time: now.incr(),
1✔
216
                    data: InputData::LineErr("stderr".to_owned())
1✔
217
                },
1✔
218
                InputItem {
1✔
219
                    time: now.incr(),
1✔
220
                    data: InputData::Done(ExitStatus::default())
1✔
221
                }
1✔
222
            ]
1✔
223
        );
1✔
224
        Ok(())
1✔
225
    }
1✔
226

227
    #[tokio::test]
228
    async fn test_done_err() -> Result<()> {
1✔
229
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
1✔
230
        let mut sys = SysInputVirtual::default();
1✔
231
        sys.set_items(list.clone());
1✔
232
        let streamer = InputStream::new_virtual(sys)?;
1✔
233
        let streamed = streamer.collect::<Vec<_>>().await;
1✔
234
        let mut now = Instant::default();
1✔
235
        assert_eq!(
1✔
236
            streamed,
1✔
237
            vec![
1✔
238
                InputItem {
1✔
239
                    time: now.incr(),
1✔
240
                    data: InputData::Start,
1✔
241
                },
1✔
242
                InputItem {
1✔
243
                    time: now.incr(),
1✔
244
                    data: InputData::Err(io::ErrorKind::UnexpectedEof)
1✔
245
                }
1✔
246
            ]
1✔
247
        );
1✔
248
        Ok(())
1✔
249
    }
1✔
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc