• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8133481532

03 Mar 2024 10:30PM UTC coverage: 55.91% (-0.7%) from 56.575%
8133481532

push

github

web-flow
Merge pull request #68 from zhiburt/patch-move-back-to-trait-based-version

[WIP] Move to trait based version `Expect`

219 of 377 new or added lines in 14 files covered. (58.09%)

155 existing lines in 6 files now uncovered.

1490 of 2665 relevant lines covered (55.91%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/session/async_session.rs
1
//! Module contains an async version of Session structure.
2

3
use std::{
4
    io::{self, IoSliceMut},
5
    pin::Pin,
6
    task::{Context, Poll},
7
    time::Duration,
8
};
9

10
use futures_lite::{
11
    ready, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
12
};
13

14
use crate::{
15
    process::{Healthcheck, Termios},
16
    AsyncExpect, Captures, Error, Expect, Needle,
17
};
18

19
/// Session represents a spawned process and its streams.
20
/// It controlls process and communication with it.
21
#[derive(Debug)]
22
pub struct Session<P, S> {
23
    process: P,
24
    stream: Stream<S>,
25
}
26

27
// GEt back to the solution where Logger is just dyn Write instead of all these magic with type system.....
28

29
impl<P, S> Session<P, S> {
30
    /// Create a new session.
31
    pub fn new(process: P, stream: S) -> io::Result<Self> {
×
32
        Ok(Self {
×
33
            process,
×
34
            stream: Stream::new(stream),
×
35
        })
36
    }
37

38
    /// Get a reference to original stream.
39
    pub fn get_stream(&self) -> &S {
×
40
        self.stream.as_ref()
×
41
    }
42

43
    /// Get a mut reference to original stream.
44
    pub fn get_stream_mut(&mut self) -> &mut S {
×
45
        self.stream.as_mut()
×
46
    }
47

48
    /// Get a reference to a process running program.
49
    pub fn get_process(&self) -> &P {
×
50
        &self.process
×
51
    }
52

53
    /// Get a mut reference to a process running program.
54
    pub fn get_process_mut(&mut self) -> &mut P {
×
55
        &mut self.process
×
56
    }
57

58
    /// Set the pty session's expect timeout.
59
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
60
        self.stream.set_expect_timeout(expect_timeout);
×
61
    }
62

63
    /// Set a expect algorithm to be either gready or lazy.
64
    ///
65
    /// Default algorithm is gready.
66
    ///
67
    /// See [Session::expect].
68
    pub fn set_expect_lazy(&mut self, is_lazy: bool) {
×
69
        self.stream.expect_lazy = is_lazy;
×
70
    }
71

72
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R>(
×
73
        mut self,
74
        new_stream: F,
75
    ) -> Result<Session<P, R>, Error> {
76
        let buf = self.stream.get_available().to_owned();
×
77

78
        let stream = self.stream.into_inner();
×
79
        let stream = new_stream(stream);
×
80
        let mut session = Session::new(self.process, stream)?;
×
81
        session.stream.keep(&buf);
×
82
        Ok(session)
×
83
    }
84

85
    /// Verifyes if stream is empty or not.
86
    pub async fn is_empty(&mut self) -> io::Result<bool>
87
    where
88
        S: AsyncRead + Unpin,
89
    {
NEW
90
        self.stream.is_empty().await
×
91
    }
92
}
93

94
impl<P, S> AsyncExpect for Session<P, S>
95
where
96
    S: AsyncWrite + AsyncRead + Unpin,
97
{
98
    async fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
99
    where
100
        N: Needle,
101
    {
102
        match self.stream.expect_lazy {
×
103
            true => self.stream.expect_lazy(needle).await,
×
104
            false => self.stream.expect_gready(needle).await,
×
105
        }
106
    }
107

108
    async fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
109
    where
110
        N: Needle,
111
    {
UNCOV
112
        self.stream.check(needle).await
×
113
    }
114

115
    async fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
116
    where
117
        N: Needle,
118
    {
UNCOV
119
        self.stream.is_matched(needle).await
×
120
    }
121

122
    async fn send<B>(&mut self, buf: B) -> Result<(), Error>
123
    where
124
        B: AsRef<[u8]>,
125
    {
NEW
126
        self.stream.write_all(buf.as_ref()).await.map_err(Error::IO)
×
127
    }
128

129
    async fn send_line<B>(&mut self, buf: B) -> Result<(), Error>
130
    where
131
        B: AsRef<[u8]>,
132
    {
133
        #[cfg(windows)]
134
        const LINE_ENDING: &[u8] = b"\r\n";
135
        #[cfg(not(windows))]
136
        const LINE_ENDING: &[u8] = b"\n";
137

138
        self.stream.write_all(buf.as_ref()).await?;
×
139
        self.stream.write_all(LINE_ENDING).await?;
×
140

141
        Ok(())
×
142
    }
143
}
144

145
impl<P, S> Healthcheck for Session<P, S>
146
where
147
    P: Healthcheck,
148
{
149
    type Status = P::Status;
150

151
    /// Verifies whether process is still alive.
NEW
152
    fn is_alive(&self) -> io::Result<bool> {
×
NEW
153
        P::is_alive(self.get_process())
×
154
    }
155

NEW
156
    fn get_status(&self) -> io::Result<Self::Status> {
×
NEW
157
        P::get_status(self.get_process())
×
158
    }
159
}
160

161
impl<P, S> AsyncWrite for Session<P, S>
162
where
163
    P: Unpin,
164
    S: AsyncWrite + Unpin,
165
{
UNCOV
166
    fn poll_write(
×
167
        self: Pin<&mut Self>,
168
        cx: &mut Context<'_>,
169
        buf: &[u8],
170
    ) -> Poll<io::Result<usize>> {
171
        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
×
172
    }
173

174
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
175
        Pin::new(&mut self.stream).poll_flush(cx)
×
176
    }
177

178
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
179
        Pin::new(&mut self.stream).poll_close(cx)
×
180
    }
181

182
    fn poll_write_vectored(
×
183
        mut self: Pin<&mut Self>,
184
        cx: &mut Context<'_>,
185
        bufs: &[io::IoSlice<'_>],
186
    ) -> Poll<io::Result<usize>> {
187
        Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
×
188
    }
189
}
190

191
impl<P, S> AsyncRead for Session<P, S>
192
where
193
    P: Unpin,
194
    S: AsyncRead + Unpin,
195
{
UNCOV
196
    fn poll_read(
×
197
        mut self: Pin<&mut Self>,
198
        cx: &mut Context<'_>,
199
        buf: &mut [u8],
200
    ) -> Poll<io::Result<usize>> {
201
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
202
    }
203
}
204

205
impl<P, S> AsyncBufRead for Session<P, S>
206
where
207
    P: Unpin,
208
    S: AsyncRead + Unpin,
209
{
210
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
211
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
212
    }
213

214
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
215
        Pin::new(&mut self.stream).consume(amt);
×
216
    }
217
}
218

219
impl<P, S> Termios for Session<P, S>
220
where
221
    P: Termios,
222
{
NEW
223
    fn is_echo(&self) -> io::Result<bool> {
×
NEW
224
        P::is_echo(self.get_process())
×
225
    }
226

NEW
227
    fn set_echo(&mut self, on: bool) -> io::Result<bool> {
×
NEW
228
        P::set_echo(self.get_process_mut(), on)
×
229
    }
230
}
231

232
/// Session represents a spawned process and its streams.
233
/// It controlls process and communication with it.
234
#[derive(Debug)]
235
struct Stream<S> {
236
    stream: BufferedStream<S>,
237
    expect_timeout: Option<Duration>,
238
    expect_lazy: bool,
239
}
240

241
impl<S> Stream<S> {
242
    /// Creates an async IO stream.
243
    fn new(stream: S) -> Self {
×
244
        Self {
245
            stream: BufferedStream::new(stream),
×
246
            expect_timeout: Some(Duration::from_millis(10000)),
×
247
            expect_lazy: false,
248
        }
249
    }
250

251
    /// Returns a reference to original stream.
252
    fn as_ref(&self) -> &S {
×
253
        &self.stream.stream
×
254
    }
255

256
    /// Returns a mut reference to original stream.
257
    fn as_mut(&mut self) -> &mut S {
×
258
        &mut self.stream.stream
×
259
    }
260

261
    /// Set the pty session's expect timeout.
262
    fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
263
        self.expect_timeout = expect_timeout;
×
264
    }
265

266
    /// Save a bytes in inner buffer.
267
    /// They'll be pushed to the end of the buffer.
268
    fn keep(&mut self, buf: &[u8]) {
×
269
        self.stream.keep(buf);
×
270
    }
271

272
    /// Get an inner buffer.
273
    fn get_available(&mut self) -> &[u8] {
×
274
        self.stream.buffer()
×
275
    }
276

277
    /// Returns an inner IO stream.
278
    fn into_inner(self) -> S {
×
279
        self.stream.stream
×
280
    }
281
}
282

283
impl<S> Stream<S>
284
where
285
    S: AsyncRead + Unpin,
286
{
287
    async fn expect_gready<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
288
        let expect_timeout = self.expect_timeout;
×
289

290
        let expect_future = async {
×
291
            let mut eof = false;
×
292
            loop {
×
293
                let data = self.stream.buffer();
×
294

295
                let found = Needle::check(&needle, data, eof)?;
×
296

297
                if !found.is_empty() {
×
298
                    let end_index = Captures::right_most_index(&found);
×
299
                    let involved_bytes = data[..end_index].to_vec();
×
300
                    self.stream.consume(end_index);
×
301

302
                    return Ok(Captures::new(involved_bytes, found));
×
303
                }
304

305
                if eof {
×
306
                    return Err(Error::Eof);
×
307
                }
308

309
                eof = self.stream.fill().await? == 0;
×
310
            }
311
        };
312

313
        if let Some(timeout) = expect_timeout {
×
314
            let timeout_future = futures_timer::Delay::new(timeout);
×
315
            futures_lite::future::or(expect_future, async {
×
316
                timeout_future.await;
×
317
                Err(Error::ExpectTimeout)
×
318
            })
319
            .await
×
320
        } else {
321
            expect_future.await
×
322
        }
323
    }
324

325
    async fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
326
    where
327
        N: Needle,
328
    {
329
        let expect_timeout = self.expect_timeout;
×
330
        let expect_future = async {
×
331
            // We read by byte to make things as lazy as possible.
332
            //
333
            // It's chose is important in using Regex as a Needle.
334
            // Imagine we have a `\d+` regex.
335
            // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
336
            //
337
            // The second reason is
338
            // if we wouldn't read by byte EOF indication could be lost.
339
            // And next blocking std::io::Read operation could be blocked forever.
340
            //
341
            // We could read all data available via `read_available` to reduce IO operations,
342
            // but in such case we would need to keep a EOF indicator internally in stream,
343
            // which is OK if EOF happens onces, but I am not sure if this is a case.
344

345
            let mut checked_length = 0;
×
346
            let mut eof = false;
×
347
            loop {
×
348
                let available = self.stream.buffer();
×
349
                let is_buffer_checked = checked_length == available.len();
×
350
                if is_buffer_checked {
×
351
                    let n = self.stream.fill().await?;
×
352
                    eof = n == 0;
×
353
                }
354

355
                // We intentinally not increase the counter
356
                // and run check one more time even though the data isn't changed.
357
                // Because it may be important for custom implementations of Needle.
358
                let available = self.stream.buffer();
×
359
                if checked_length < available.len() {
×
360
                    checked_length += 1;
×
361
                }
362

363
                let data = &available[..checked_length];
×
364
                let found = Needle::check(&needle, data, eof)?;
×
365
                if !found.is_empty() {
×
366
                    let end_index = Captures::right_most_index(&found);
×
367
                    let involved_bytes = data[..end_index].to_vec();
×
368
                    self.stream.consume(end_index);
×
369
                    return Ok(Captures::new(involved_bytes, found));
×
370
                }
371

372
                if eof {
×
373
                    return Err(Error::Eof);
×
374
                }
375
            }
376
        };
377

378
        if let Some(timeout) = expect_timeout {
×
379
            let timeout_future = futures_timer::Delay::new(timeout);
×
380
            futures_lite::future::or(expect_future, async {
×
381
                timeout_future.await;
×
382
                Err(Error::ExpectTimeout)
×
383
            })
384
            .await
×
385
        } else {
386
            expect_future.await
×
387
        }
388
    }
389

390
    /// Is matched checks if a pattern is matched.
391
    /// It doesn't consumes bytes from stream.
392
    async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
393
        let eof = self.try_fill().await?;
×
394
        let buf = self.stream.buffer();
×
395

396
        let found = needle.check(buf, eof)?;
×
397
        if !found.is_empty() {
×
398
            return Ok(true);
×
399
        }
400

401
        if eof {
×
402
            return Err(Error::Eof);
×
403
        }
404

405
        Ok(false)
406
    }
407

408
    /// Check checks if a pattern is matched.
409
    /// Returns empty found structure if nothing found.
410
    async fn check<E>(&mut self, needle: E) -> Result<Captures, Error>
411
    where
412
        E: Needle,
413
    {
UNCOV
414
        let eof = self.try_fill().await?;
×
415

416
        let buf = self.stream.buffer();
×
417
        let found = needle.check(buf, eof)?;
×
418
        if !found.is_empty() {
×
419
            let end_index = Captures::right_most_index(&found);
×
420
            let involved_bytes = buf[..end_index].to_vec();
×
421
            self.stream.consume(end_index);
×
422
            return Ok(Captures::new(involved_bytes, found));
×
423
        }
424

425
        if eof {
×
426
            return Err(Error::Eof);
×
427
        }
428

429
        Ok(Captures::new(Vec::new(), Vec::new()))
×
430
    }
431

432
    /// Verifyes if stream is empty or not.
433
    async fn is_empty(&mut self) -> io::Result<bool> {
×
434
        match futures_lite::future::poll_once(self.read(&mut [])).await {
×
435
            Some(Ok(0)) => Ok(true),
436
            Some(Ok(_)) => Ok(false),
437
            Some(Err(err)) => Err(err),
×
438
            None => Ok(true),
439
        }
440
    }
441

442
    async fn try_fill(&mut self) -> Result<bool, Error> {
×
443
        match futures_lite::future::poll_once(self.stream.fill()).await {
×
444
            Some(Ok(n)) => Ok(n == 0),
×
445
            Some(Err(err)) => Err(err.into()),
×
446
            None => Ok(false),
447
        }
448
    }
449
}
450

451
impl<S> AsyncWrite for Stream<S>
452
where
453
    S: AsyncWrite + Unpin,
454
{
UNCOV
455
    fn poll_write(
×
456
        mut self: Pin<&mut Self>,
457
        cx: &mut Context<'_>,
458
        buf: &[u8],
459
    ) -> Poll<io::Result<usize>> {
460
        Pin::new(&mut *self.stream.get_mut()).poll_write(cx, buf)
×
461
    }
462

463
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
464
        Pin::new(&mut *self.stream.get_mut()).poll_flush(cx)
×
465
    }
466

467
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
468
        Pin::new(&mut *self.stream.get_mut()).poll_close(cx)
×
469
    }
470

471
    fn poll_write_vectored(
×
472
        mut self: Pin<&mut Self>,
473
        cx: &mut Context<'_>,
474
        bufs: &[io::IoSlice<'_>],
475
    ) -> Poll<io::Result<usize>> {
476
        Pin::new(&mut *self.stream.get_mut()).poll_write_vectored(cx, bufs)
×
477
    }
478
}
479

480
impl<S> AsyncRead for Stream<S>
481
where
482
    S: AsyncRead + Unpin,
483
{
UNCOV
484
    fn poll_read(
×
485
        mut self: Pin<&mut Self>,
486
        cx: &mut Context<'_>,
487
        buf: &mut [u8],
488
    ) -> Poll<io::Result<usize>> {
489
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
490
    }
491

NEW
492
    fn poll_read_vectored(
×
493
        self: Pin<&mut Self>,
494
        cx: &mut Context<'_>,
495
        bufs: &mut [IoSliceMut<'_>],
496
    ) -> Poll<io::Result<usize>> {
NEW
497
        for b in bufs {
×
NEW
498
            if !b.is_empty() {
×
NEW
499
                return self.poll_read(cx, b);
×
500
            }
501
        }
502

NEW
503
        self.poll_read(cx, &mut [])
×
504
    }
505
}
506

507
impl<S> AsyncBufRead for Stream<S>
508
where
509
    S: AsyncRead + Unpin,
510
{
511
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
512
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
513
    }
514

515
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
516
        Pin::new(&mut self.stream).consume(amt);
×
517
    }
518
}
519

520
/// Session represents a spawned process and its streams.
521
/// It controlls process and communication with it.
522
#[derive(Debug)]
523
struct BufferedStream<S> {
524
    stream: S,
525
    buffer: Vec<u8>,
526
    length: usize,
527
}
528

529
impl<S> BufferedStream<S> {
530
    fn new(stream: S) -> Self {
×
531
        Self {
532
            stream,
533
            buffer: Vec::new(),
×
534
            length: 0,
535
        }
536
    }
537

538
    fn keep(&mut self, buf: &[u8]) {
×
539
        self.buffer.extend(buf);
×
540
        self.length += buf.len();
×
541
    }
542

543
    fn buffer(&self) -> &[u8] {
×
544
        &self.buffer[..self.length]
×
545
    }
546

547
    fn get_mut(&mut self) -> &mut S {
×
548
        &mut self.stream
×
549
    }
550
}
551

552
impl<S: AsyncRead + Unpin> BufferedStream<S> {
553
    async fn fill(&mut self) -> io::Result<usize> {
×
554
        let mut buf = [0; 128];
×
555
        let n = self.stream.read(&mut buf).await?;
×
556
        self.keep(&buf[..n]);
×
557
        Ok(n)
×
558
    }
559
}
560

561
impl<S: AsyncRead + Unpin> AsyncRead for BufferedStream<S> {
562
    fn poll_read(
×
563
        mut self: Pin<&mut Self>,
564
        cx: &mut Context<'_>,
565
        buf: &mut [u8],
566
    ) -> Poll<io::Result<usize>> {
567
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
568
        let nread = std::io::Read::read(&mut rem, buf)?;
×
569
        self.consume(nread);
×
570
        Poll::Ready(Ok(nread))
×
571
    }
572

573
    fn poll_read_vectored(
×
574
        mut self: Pin<&mut Self>,
575
        cx: &mut Context<'_>,
576
        bufs: &mut [IoSliceMut<'_>],
577
    ) -> Poll<io::Result<usize>> {
578
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
579
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
×
580
        self.consume(nread);
×
581
        Poll::Ready(Ok(nread))
×
582
    }
583
}
584

585
impl<S: AsyncRead + Unpin> AsyncBufRead for BufferedStream<S> {
586
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
587
        if self.buffer.is_empty() {
×
588
            let mut buf = [0; 128];
×
589
            let n = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
×
590
            self.keep(&buf[..n]);
×
591
        }
592

593
        let buf = self.get_mut().buffer();
×
594
        Poll::Ready(Ok(buf))
×
595
    }
596

597
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
598
        let _ = self.buffer.drain(..amt);
×
599
        self.length -= amt;
×
600
    }
601
}
602

603
#[cfg(test)]
604
mod tests {
605
    use futures_lite::AsyncWriteExt;
606

607
    use crate::Eof;
608

609
    use super::*;
610

611
    #[test]
612
    fn test_expect_lazy() {
613
        let buf = b"Hello World".to_vec();
614
        let cursor = futures_lite::io::Cursor::new(buf);
615
        let mut stream = Stream::new(cursor);
616

617
        futures_lite::future::block_on(async {
618
            let found = stream.expect_lazy("World").await.unwrap();
619
            assert_eq!(b"Hello ", found.before());
620
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
621
        });
622
    }
623

624
    #[test]
625
    fn test_expect_lazy_eof() {
626
        let buf = b"Hello World".to_vec();
627
        let cursor = futures_lite::io::Cursor::new(buf);
628
        let mut stream = Stream::new(cursor);
629

630
        futures_lite::future::block_on(async {
631
            let found = stream.expect_lazy(Eof).await.unwrap();
632
            assert_eq!(b"", found.before());
633
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
634
        });
635

636
        let cursor = futures_lite::io::Cursor::new(Vec::new());
637
        let mut stream = Stream::new(cursor);
638

639
        futures_lite::future::block_on(async {
640
            let err = stream.expect_lazy("").await.unwrap_err();
641
            assert!(matches!(err, Error::Eof));
642
        });
643
    }
644

645
    #[test]
646
    fn test_expect_lazy_timeout() {
647
        futures_lite::future::block_on(async {
648
            let mut stream = Stream::new(NoEofReader::default());
649
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
650

651
            stream.write_all(b"Hello").await.unwrap();
652

653
            let err = stream.expect_lazy("Hello World").await.unwrap_err();
654
            assert!(matches!(err, Error::ExpectTimeout));
655

656
            stream.write_all(b" World").await.unwrap();
657
            let found = stream.expect_lazy("World").await.unwrap();
658
            assert_eq!(b"Hello ", found.before());
659
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
660
        });
661
    }
662

663
    #[test]
664
    fn test_expect_gready() {
665
        let buf = b"Hello World".to_vec();
666
        let cursor = futures_lite::io::Cursor::new(buf);
667
        let mut stream = Stream::new(cursor);
668

669
        futures_lite::future::block_on(async {
670
            let found = stream.expect_gready("World").await.unwrap();
671
            assert_eq!(b"Hello ", found.before());
672
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
673
        });
674
    }
675

676
    #[test]
677
    fn test_expect_gready_eof() {
678
        let buf = b"Hello World".to_vec();
679
        let cursor = futures_lite::io::Cursor::new(buf);
680
        let mut stream = Stream::new(cursor);
681

682
        futures_lite::future::block_on(async {
683
            let found = stream.expect_gready(Eof).await.unwrap();
684
            assert_eq!(b"", found.before());
685
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
686
        });
687

688
        let cursor = futures_lite::io::Cursor::new(Vec::new());
689
        let mut stream = Stream::new(cursor);
690

691
        futures_lite::future::block_on(async {
692
            let err = stream.expect_gready("").await.unwrap_err();
693
            assert!(matches!(err, Error::Eof));
694
        });
695
    }
696

697
    #[test]
698
    fn test_expect_gready_timeout() {
699
        futures_lite::future::block_on(async {
700
            let mut stream = Stream::new(NoEofReader::default());
701
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
702

703
            stream.write_all(b"Hello").await.unwrap();
704

705
            let err = stream.expect_gready("Hello World").await.unwrap_err();
706
            assert!(matches!(err, Error::ExpectTimeout));
707

708
            stream.write_all(b" World").await.unwrap();
709
            let found = stream.expect_gready("World").await.unwrap();
710
            assert_eq!(b"Hello ", found.before());
711
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
712
        });
713
    }
714

715
    #[test]
716
    fn test_check() {
717
        let buf = b"Hello World".to_vec();
718
        let cursor = futures_lite::io::Cursor::new(buf);
719
        let mut stream = Stream::new(cursor);
720

721
        futures_lite::future::block_on(async {
722
            let found = stream.check("World").await.unwrap();
723
            assert_eq!(b"Hello ", found.before());
724
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
725
        });
726
    }
727

728
    #[test]
729
    fn test_is_matched() {
730
        let mut stream = Stream::new(NoEofReader::default());
731
        futures_lite::future::block_on(async {
732
            stream.write_all(b"Hello World").await.unwrap();
733
            assert!(stream.is_matched("World").await.unwrap());
734
            assert!(!stream.is_matched("*****").await.unwrap());
735

736
            let found = stream.check("World").await.unwrap();
737
            assert_eq!(b"Hello ", found.before());
738
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
739
        });
740
    }
741

742
    #[derive(Debug, Default)]
743
    struct NoEofReader {
744
        data: Vec<u8>,
745
    }
746

747
    impl AsyncWrite for NoEofReader {
748
        fn poll_write(
×
749
            mut self: Pin<&mut Self>,
750
            _: &mut Context<'_>,
751
            buf: &[u8],
752
        ) -> Poll<io::Result<usize>> {
753
            self.data.extend(buf);
×
754
            Poll::Ready(Ok(buf.len()))
×
755
        }
756

757
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
758
            Poll::Ready(Ok(()))
×
759
        }
760

761
        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
762
            Poll::Ready(Ok(()))
×
763
        }
764
    }
765

766
    impl AsyncRead for NoEofReader {
767
        fn poll_read(
×
768
            mut self: Pin<&mut Self>,
769
            _: &mut Context<'_>,
770
            mut buf: &mut [u8],
771
        ) -> Poll<io::Result<usize>> {
772
            if self.data.is_empty() {
×
773
                return Poll::Pending;
×
774
            }
775

776
            let n = std::io::Write::write(&mut buf, &self.data)?;
×
777
            let _ = self.data.drain(..n);
×
778
            Poll::Ready(Ok(n))
×
779
        }
780
    }
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc