• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lpenz / ogle / 21593360816

02 Feb 2026 02:08PM UTC coverage: 59.326% (-0.4%) from 59.713%
21593360816

push

github

lpenz
Migrate to crossterm with a tokio thread that pools the keyboard

We were using line streaming on stdin, but that doesn't provide a
cancellation point, which is why the -z flag was not working.

Fixes #9

0 of 9 new or added lines in 1 file covered. (0.0%)

458 of 772 relevant lines covered (59.33%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.78
/src/user_wrapper.rs
1
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
2
// This file is subject to the terms and conditions defined in
3
// file 'LICENSE', which is part of this source code package.
4

5
//! Wrapper for user interaction.
6
//!
7
//! For now we just check if the user has typed ENTER, which makes
8
//! ogle exit after the current run is over.
9

10
use crossterm::event::{self, Event};
11
use crossterm::tty::IsTty;
12
use pin_project::pin_project;
13
use std::pin::Pin;
14
use std::task::{Context, Poll};
15
use tokio::io;
16
use tokio::sync::mpsc;
17
use tokio::task::JoinHandle;
18
use tokio::time::{Duration, sleep};
19
use tokio_stream::Stream;
20
use tracing::instrument;
21

22
/// A wrapper for `crossterm` that polls the keyboard and provides the
23
/// keypress in a tokio stream.
24
///
25
/// Also provides a virtual implementation for use in tests.
26
#[pin_project(project = UserStreamProj)]
27
#[derive(Debug, Default)]
28
pub enum UserStream {
29
    /// A real implementation that reads a line from stdin.
30
    Real(mpsc::UnboundedReceiver<String>, JoinHandle<()>),
31
    /// A virtual implementation that doesn't do anything.
32
    #[default]
33
    Virtual,
34
}
35

36
impl UserStream {
37
    pub fn new_real() -> Option<UserStream> {
×
38
        let stdin = io::stdin();
×
NEW
39
        if stdin.is_tty() {
×
NEW
40
            let (tx, rx) = mpsc::unbounded_channel::<String>();
×
NEW
41
            let handle = tokio::spawn(async move {
×
42
                loop {
NEW
43
                    if matches!(event::poll(Duration::from_secs(0)), Ok(true))
×
NEW
44
                        && let Ok(Event::Key(key_event)) = event::read()
×
45
                    {
NEW
46
                        tx.send(format!("{}", key_event.code))
×
NEW
47
                            .unwrap_or_else(|e| panic!("error reading key event: {}", e));
×
48
                    } else {
49
                        // We tokio-sleep here to provide a cancellation point:
NEW
50
                        sleep(Duration::from_millis(127)).await;
×
51
                    }
52
                }
53
            });
NEW
54
            Some(UserStream::Real(rx, handle))
×
55
        } else {
56
            None
×
57
        }
58
    }
×
59

60
    pub fn new_virtual() -> UserStream {
2✔
61
        UserStream::Virtual
2✔
62
    }
2✔
63
}
64

65
impl Stream for UserStream {
66
    type Item = String;
67

68
    #[instrument(level = "debug", ret, skip(cx))]
69
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2✔
70
        let this = self.project();
71
        match this {
72
            UserStreamProj::Real(rx, _) => {
73
                let next = Pin::new(rx).poll_recv(cx);
74
                match next {
75
                    Poll::Ready(Some(s)) => Poll::Ready(Some(s)),
76
                    Poll::Ready(None) => Poll::Ready(None),
77
                    Poll::Pending => Poll::Pending,
78
                }
79
            }
80
            UserStreamProj::Virtual {} => Poll::Ready(None),
81
        }
82
    }
2✔
83
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc