• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 12681295861

09 Jan 2025 12:08AM UTC coverage: 35.714% (+20.6%) from 15.129%
12681295861

push

github

lpenz
stream: create StreamItem::Err to map wait's error return

1 of 4 new or added lines in 2 files covered. (25.0%)

1 existing line in 1 file now uncovered.

120 of 336 relevant lines covered (35.71%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.53
/src/stream.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use std::convert::TryFrom;
7
use std::process::ExitStatus;
8
use tokio::time;
9
use tokio_process_stream as tps;
10
use tokio_stream::wrappers::IntervalStream;
11
use tokio_stream::StreamExt;
12

13
use crate::cli::Cli;
14

15
#[derive(Debug)]
16
pub enum StreamItem {
17
    LineOut(String),
18
    LineErr(String),
19
    Done(ExitStatus),
20
    Err(std::io::Error),
21
    Tick,
22
}
23

24
impl From<tps::Item<String>> for StreamItem {
25
    fn from(item: tps::Item<String>) -> Self {
6✔
26
        match item {
4✔
27
            tps::Item::Stdout(l) => StreamItem::LineOut(l),
1✔
28
            tps::Item::Stderr(l) => StreamItem::LineErr(l),
1✔
29
            tps::Item::Done(Ok(sts)) => StreamItem::Done(sts),
4✔
NEW
30
            tps::Item::Done(Err(e)) => StreamItem::Err(e),
×
31
        }
32
    }
6✔
33
}
34

35
impl From<time::Instant> for StreamItem {
36
    fn from(_: time::Instant) -> Self {
3✔
37
        StreamItem::Tick
3✔
38
    }
3✔
39
}
40

41
pub fn stream_create(
4✔
42
    cli: &Cli,
4✔
43
    refresh_delay: time::Duration,
4✔
44
) -> Result<impl StreamExt<Item = StreamItem> + std::marker::Unpin + Send + 'static> {
4✔
45
    let cmd = cli.get_command();
4✔
46
    let procstream = tps::ProcessStream::try_from(cmd)?.map(StreamItem::from);
4✔
47
    let ticker = IntervalStream::new(time::interval(refresh_delay));
4✔
48
    Ok(procstream.merge(ticker.map(StreamItem::from)))
4✔
49
}
4✔
50

51
#[cfg(test)]
52
mod tests {
53
    use clap::Parser;
54
    use color_eyre::eyre::eyre;
55
    use color_eyre::Result;
56
    use tokio::time;
57
    use tokio_stream::StreamExt;
58

59
    use crate::cli::Cli;
60

61
    use super::*;
62

63
    async fn stream_cmd(
4✔
64
        cmd: &[&str],
4✔
65
    ) -> Result<impl StreamExt<Item = StreamItem> + std::marker::Unpin + Send + 'static> {
4✔
66
        let duration = time::Duration::from_millis(5000);
4✔
67
        stream_create(&Cli::try_parse_from(cmd)?, duration)
4✔
68
    }
4✔
69

70
    async fn stream_next<T>(stream: &mut T) -> Result<StreamItem>
6✔
71
    where
6✔
72
        T: StreamExt<Item = StreamItem> + std::marker::Unpin + Send + 'static,
6✔
73
    {
6✔
74
        while let Some(item) = stream.next().await {
9✔
75
            match item {
9✔
76
                StreamItem::Tick => {
77
                    continue;
3✔
78
                }
79
                _ => {
80
                    return Ok(item);
6✔
81
                }
82
            }
83
        }
UNCOV
84
        Err(eyre!("no item received"))
×
85
    }
6✔
86

87
    #[tokio::test]
88
    async fn test_true() -> Result<()> {
1✔
89
        let mut stream = stream_cmd(&["ogle", "true"]).await?;
1✔
90
        let item = stream_next(&mut stream).await?;
1✔
91
        let StreamItem::Done(sts) = item else {
1✔
92
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
93
        };
1✔
94
        assert!(sts.success());
1✔
95
        Ok(())
1✔
96
    }
1✔
97

98
    #[tokio::test]
99
    async fn test_false() -> Result<()> {
1✔
100
        let mut stream = stream_cmd(&["ogle", "false"]).await?;
1✔
101
        let item = stream_next(&mut stream).await?;
1✔
102
        let StreamItem::Done(sts) = item else {
1✔
103
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
104
        };
1✔
105
        assert!(!sts.success());
1✔
106
        Ok(())
1✔
107
    }
1✔
108

109
    #[tokio::test]
110
    async fn test_echo() -> Result<()> {
1✔
111
        let mut stream = stream_cmd(&["ogle", "echo", "test"]).await?;
1✔
112
        let item = stream_next(&mut stream).await?;
1✔
113
        let StreamItem::LineOut(s) = item else {
1✔
114
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
115
        };
1✔
116
        assert_eq!(s, "test");
1✔
117
        let item = stream_next(&mut stream).await?;
1✔
118
        let StreamItem::Done(sts) = item else {
1✔
119
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
120
        };
1✔
121
        assert!(sts.success());
1✔
122
        Ok(())
1✔
123
    }
1✔
124

125
    #[tokio::test]
126
    async fn test_stderr() -> Result<()> {
1✔
127
        let mut stream = stream_cmd(&["ogle", "--", "/bin/sh", "-c", "echo test >&2"]).await?;
1✔
128
        let item = stream_next(&mut stream).await?;
1✔
129
        let StreamItem::LineErr(s) = item else {
1✔
130
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
131
        };
1✔
132
        assert_eq!(s, "test");
1✔
133
        let item = stream_next(&mut stream).await?;
1✔
134
        let StreamItem::Done(sts) = item else {
1✔
135
            return Err(eyre!("unexpected stream item {:?}", item));
1✔
136
        };
1✔
137
        assert!(sts.success());
1✔
138
        Ok(())
1✔
139
    }
1✔
140
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc