• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 12681295861

09 Jan 2025 12:08AM UTC coverage: 35.714% (+20.6%) from 15.129%
12681295861

push

github

lpenz
stream: create StreamItem::Err to map wait's error return

1 of 4 new or added lines in 2 files covered. (25.0%)

1 existing line in 1 file now uncovered.

120 of 336 relevant lines covered (35.71%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/runner.rs
1
// Copyright (C) 2020 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use color_eyre::Result;
6
use std::process::ExitStatus;
7
use tokio::time;
8
use tokio_stream::StreamExt;
9

10
use crate::cli::Cli;
11
use crate::misc::localnow;
12
use crate::progbar::Progbar;
13
use crate::stream::stream_create;
14
use crate::stream::StreamItem;
15

16
const REFRESH_DELAY: time::Duration = time::Duration::from_millis(250);
17

18
#[derive(Debug, Default, Clone)]
19
pub struct RunData {
20
    status: Option<ExitStatus>,
21
    output: Vec<String>,
22
    duration: time::Duration,
23
}
24

25
impl RunData {
26
    pub fn success(&self) -> bool {
×
27
        self.status.map_or(false, |s| s.success())
×
28
    }
×
29
}
30

31
pub fn print_backlog(pb: &mut Progbar, cmdline: &str, lines: &[String]) -> Result<()> {
×
32
    pb.hide()?;
×
33
    println!();
×
34
    println!("=> {} changed", localnow());
×
35
    println!("+ {}", cmdline);
×
36
    for l in lines {
×
37
        println!("{}", l);
×
38
    }
×
39
    pb.show()?;
×
40
    Ok(())
×
41
}
×
42

43
pub async fn stream_task<T>(
×
44
    cmdline: &str,
×
45
    last_lines: Vec<String>,
×
46
    last_period: time::Duration,
×
47
    mut stream: T,
×
48
    pb: &mut Progbar,
×
49
) -> Result<(ExitStatus, Vec<String>)>
×
50
where
×
51
    T: StreamExt<Item = StreamItem> + std::marker::Unpin + Send + 'static,
×
52
{
×
53
    let mut lines = vec![];
×
54
    let mut different = false;
×
55
    let mut nlines = 0;
×
56
    pb.set_running(last_period);
×
57
    while let Some(item) = stream.next().await {
×
58
        let (stsopt, lineopt) = match item {
×
59
            StreamItem::LineOut(line) => (None, Some(line)),
×
60
            StreamItem::LineErr(line) => (None, Some(line)),
×
61
            StreamItem::Tick => {
62
                pb.refresh()?;
×
63
                (None, None)
×
64
            }
65
            StreamItem::Done(sts) => (
×
66
                Some(sts),
×
67
                if let Some(code) = sts.code() {
×
68
                    if code == 0 {
×
69
                        None
×
70
                    } else {
71
                        Some(format!("=> exit code {}", code))
×
72
                    }
73
                } else {
74
                    Some("=> error getting exit code".to_string())
×
75
                },
76
            ),
NEW
77
            StreamItem::Err(e) => {
×
NEW
78
                panic!("{}", e);
×
79
            }
80
        };
81
        if let Some(line) = lineopt {
×
82
            lines.push(line);
×
83
            nlines += 1;
×
84
            if different {
×
85
                pb.hide()?;
×
86
                println!("{}", lines[nlines - 1]);
×
87
                pb.show()?;
×
88
            } else if last_lines.len() < nlines || lines[nlines - 1] != last_lines[nlines - 1] {
×
89
                // Print everything so far
90
                print_backlog(pb, cmdline, &lines)?;
×
91
                different = true;
×
92
            }
×
93
        }
×
94
        if let Some(sts) = stsopt {
×
95
            /* Process is done, check if we got less lines: */
96
            if !different && last_lines.len() > nlines {
×
97
                print_backlog(pb, cmdline, &lines)?;
×
98
            }
×
99
            return Ok((sts, lines));
×
100
        }
×
101
    }
102
    panic!("stream ended before process");
×
103
}
×
104

105
pub async fn run_once(cli: &Cli, last_rundata: RunData, pb: &mut Progbar) -> Result<RunData> {
×
106
    let start = time::Instant::now();
×
107
    let stream = stream_create(cli, REFRESH_DELAY)?;
×
108
    let cmdline = cli.command.join(" ");
×
109
    let task = stream_task(
×
110
        &cmdline,
×
111
        last_rundata.output,
×
112
        last_rundata.duration,
×
113
        stream,
×
114
        pb,
×
115
    );
×
116
    let (status, vecboth) = task.await?;
×
117
    Ok(RunData {
×
118
        status: Some(status),
×
119
        output: vecboth,
×
120
        duration: start.elapsed(),
×
121
    })
×
122
}
×
123

124
pub async fn run_loop(cli: &Cli) -> Result<()> {
×
125
    let mut pb = Progbar::default();
×
126
    let mut last_rundata = run_once(cli, RunData::default(), &mut pb).await?;
×
127
    if cli.until_success && last_rundata.success() || cli.until_failure && !last_rundata.success() {
×
128
        return Ok(());
×
129
    }
×
130
    let cli_period = time::Duration::from_secs(cli.period);
×
131
    loop {
132
        let rundata = run_once(cli, last_rundata, &mut pb).await?;
×
133
        if cli.until_success && rundata.success() || cli.until_failure && !rundata.success() {
×
134
            return Ok(());
×
135
        }
×
136
        last_rundata = rundata;
×
137
        pb.set_sleep(cli_period);
×
138
        let end = time::Instant::now() + cli_period;
×
139
        while time::Instant::now() < end {
×
140
            pb.refresh()?;
×
141
            time::sleep(REFRESH_DELAY).await;
×
142
        }
143
    }
144
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc