• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 16092339124

05 Jul 2025 09:45PM UTC coverage: 62.041% (+16.6%) from 45.475%
16092339124

push

github

lpenz
Complete refactoring using layers that should be easier to test

Layers connected via streams, which we can mock and test.
This combines a bunch of commits that documented this slow conversion.

344 of 539 new or added lines in 12 files covered. (63.82%)

2 existing lines in 2 files now uncovered.

389 of 627 relevant lines covered (62.04%)

1.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/view.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
use pin_project::pin_project;
6
use std::collections::VecDeque;
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9
use tokio_stream::Stream;
10

11
use crate::differ::Differ;
12
use crate::input::InputData;
13
use crate::input::InputItem;
14
use crate::input::InputStream;
15
use crate::output::ClearLine;
16
use crate::output::MoveCursorUp;
17
use crate::output::OutputCommand;
18
use crate::output::WriteAll;
19
use crate::process_wrapper::Cmd;
20
use crate::progbar::progbar_running;
21
use crate::progbar::progbar_sleeping;
22
use crate::progbar::spinner_get;
23
use crate::sys::SysApi;
24
use crate::time_wrapper::Duration;
25
use crate::time_wrapper::Instant;
26

27
#[pin_project(project = PipeProjection)]
28
pub struct Pipe<SI: SysApi> {
29
    cmd: Cmd,
30
    refresh: Duration,
31
    sleep: Duration,
32
    input: InputStream<SI>,
33
    pending: VecDeque<OutputCommand>,
34
    differ: Differ,
35
    spinner: char,
36
    start: Instant, // can be start of running or sleep
37
    duration: Option<Duration>,
38
    printed_status: bool,
39
}
40

41
impl<SI: SysApi> Pipe<SI> {
NEW
42
    pub fn new(cmd: Cmd, refresh: Duration, sleep: Duration, input: InputStream<SI>) -> Self {
×
NEW
43
        Pipe {
×
NEW
44
            cmd,
×
NEW
45
            refresh,
×
NEW
46
            sleep,
×
NEW
47
            input,
×
NEW
48
            pending: VecDeque::default(),
×
NEW
49
            differ: Differ::default(),
×
NEW
50
            spinner: '-',
×
NEW
51
            start: Instant::default(),
×
NEW
52
            duration: None,
×
NEW
53
            printed_status: false,
×
NEW
54
        }
×
NEW
55
    }
×
56
}
57

58
impl<SI: SysApi> PipeProjection<'_, SI> {
NEW
59
    fn _println(&mut self, mut s: String) {
×
NEW
60
        s.push('\n');
×
NEW
61
        self.pending
×
NEW
62
            .push_back(OutputCommand::WriteAll(WriteAll(s.as_bytes().to_vec())))
×
NEW
63
    }
×
64

NEW
65
    fn status_maybe_clear(&mut self) {
×
NEW
66
        if *self.printed_status {
×
NEW
67
            self.pending
×
NEW
68
                .push_back(OutputCommand::MoveCursorUp(MoveCursorUp(1)));
×
NEW
69
            self.pending
×
NEW
70
                .push_back(OutputCommand::ClearLine(ClearLine {}));
×
NEW
71
        }
×
NEW
72
    }
×
73

NEW
74
    fn println(&mut self, s: String) {
×
NEW
75
        self.status_maybe_clear();
×
NEW
76
        self._println(s);
×
NEW
77
        *self.printed_status = false;
×
NEW
78
    }
×
79

NEW
80
    fn process_line(&mut self, line: String) {
×
NEW
81
        self.differ.push(line);
×
NEW
82
        let mut differ = std::mem::take(self.differ);
×
NEW
83
        if differ.has_changed() {
×
NEW
84
            for line in &mut differ {
×
NEW
85
                self.println(line);
×
NEW
86
            }
×
NEW
87
        }
×
NEW
88
        *self.differ = differ;
×
NEW
89
    }
×
90

NEW
91
    fn status_update_running(&mut self, now: Instant) {
×
NEW
92
        self.status_maybe_clear();
×
NEW
93
        let mut spinner = *self.spinner;
×
NEW
94
        self._println(ofmt!(
×
NEW
95
            &now,
×
NEW
96
            "{}",
×
NEW
97
            progbar_running(
×
98
                150,                       // width: usize,
NEW
99
                &now,                      // now: &Instant,
×
NEW
100
                self.start,                // start: &Instant,
×
NEW
101
                *self.duration,            // duration: Option<&Duration>,
×
NEW
102
                self.refresh,              // refresh: &Duration,
×
NEW
103
                spinner_get(&mut spinner)  // spinner: char,
×
104
            )
NEW
105
            .unwrap()
×
106
        ));
NEW
107
        *self.spinner = spinner;
×
NEW
108
        *self.printed_status = true;
×
NEW
109
    }
×
110

NEW
111
    fn status_update_sleeping(&mut self, now: Instant, deadline: Instant) {
×
NEW
112
        self.status_maybe_clear();
×
NEW
113
        self._println(ofmt!(
×
NEW
114
            self.start,
×
NEW
115
            "{}",
×
NEW
116
            progbar_sleeping(self.sleep, &now, &deadline)
×
117
        ));
NEW
118
        *self.printed_status = true;
×
NEW
119
    }
×
120
}
121

122
impl<SI: SysApi> Stream for Pipe<SI> {
123
    type Item = OutputCommand;
124

NEW
125
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
×
NEW
126
        let mut this = self.as_mut().project();
×
NEW
127
        if let Some(output) = this.pending.pop_front() {
×
NEW
128
            return Poll::Ready(Some(output));
×
NEW
129
        }
×
NEW
130
        let item = Pin::new(&mut this.input).poll_next(cx);
×
NEW
131
        match item {
×
NEW
132
            Poll::Pending => Poll::Pending,
×
NEW
133
            Poll::Ready(Some(InputItem { time: now, data })) => match data {
×
134
                InputData::Start => {
NEW
135
                    this.println(ofmt!(&now, "start execution"));
×
NEW
136
                    this.println(format!("+ {}", this.cmd));
×
NEW
137
                    this.differ.reset();
×
NEW
138
                    *this.start = now;
×
NEW
139
                    this.status_update_running(now);
×
NEW
140
                    self.poll_next(cx)
×
141
                }
NEW
142
                InputData::LineOut(line) => {
×
NEW
143
                    this.process_line(line);
×
NEW
144
                    this.status_update_running(now);
×
NEW
145
                    self.poll_next(cx)
×
146
                }
NEW
147
                InputData::LineErr(line) => {
×
NEW
148
                    this.process_line(line);
×
NEW
149
                    this.status_update_running(now);
×
NEW
150
                    self.poll_next(cx)
×
151
                }
NEW
152
                InputData::Done(sts) => {
×
NEW
153
                    this.println(ofmt!(&now, "done {:?}", sts));
×
NEW
154
                    *this.duration = Some(&now - this.start);
×
155
                    // Sleeping starts now
NEW
156
                    *this.start = now;
×
NEW
157
                    self.poll_next(cx)
×
158
                }
NEW
159
                InputData::Err(e) => {
×
NEW
160
                    this.println(ofmt!(&now, "err {:?}", e));
×
NEW
161
                    self.poll_next(cx)
×
162
                }
163
                InputData::RunTick => {
NEW
164
                    this.status_update_running(now);
×
NEW
165
                    self.poll_next(cx)
×
166
                }
NEW
167
                InputData::SleepTick(deadline) => {
×
NEW
168
                    this.status_update_sleeping(now, deadline);
×
NEW
169
                    self.poll_next(cx)
×
170
                }
171
            },
NEW
172
            Poll::Ready(None) => Poll::Ready(None),
×
173
        }
NEW
174
    }
×
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc