• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8131961689

03 Mar 2024 05:43PM UTC coverage: 55.931% (-0.6%) from 56.575%
8131961689

Pull #68

github

web-flow
Merge c48d7fe3a into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

219 of 376 new or added lines in 14 files covered. (58.24%)

155 existing lines in 6 files now uncovered.

1490 of 2664 relevant lines covered (55.93%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/session/async_session.rs
1
//! Module contains an async version of Session structure.
2

3
use std::{
4
    io::{self, IoSliceMut},
5
    pin::Pin,
6
    task::{Context, Poll},
7
    time::Duration,
8
};
9

10
use futures_lite::{
11
    ready, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
12
};
13

14
use crate::{
15
    process::{Healthcheck, Termios},
16
    AsyncExpect, Captures, Error, Expect, Needle,
17
};
18

19
/// Session represents a spawned process and its streams.
20
/// It controlls process and communication with it.
21
#[derive(Debug)]
22
pub struct Session<P, S> {
23
    process: P,
24
    stream: Stream<S>,
25
}
26

27
// GEt back to the solution where Logger is just dyn Write instead of all these magic with type system.....
28

29
impl<P, S> Session<P, S> {
30
    /// Create a new session.
31
    pub fn new(process: P, stream: S) -> io::Result<Self> {
×
32
        Ok(Self {
×
33
            process,
×
34
            stream: Stream::new(stream),
×
35
        })
36
    }
37

38
    /// Get a reference to original stream.
39
    pub fn get_stream(&self) -> &S {
×
40
        self.stream.as_ref()
×
41
    }
42

43
    /// Get a mut reference to original stream.
44
    pub fn get_stream_mut(&mut self) -> &mut S {
×
45
        self.stream.as_mut()
×
46
    }
47

48
    /// Get a reference to a process running program.
49
    pub fn get_process(&self) -> &P {
×
50
        &self.process
×
51
    }
52

53
    /// Get a mut reference to a process running program.
54
    pub fn get_process_mut(&mut self) -> &mut P {
×
55
        &mut self.process
×
56
    }
57

58
    /// Set the pty session's expect timeout.
59
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
60
        self.stream.set_expect_timeout(expect_timeout);
×
61
    }
62

63
    /// Set a expect algorithm to be either gready or lazy.
64
    ///
65
    /// Default algorithm is gready.
66
    ///
67
    /// See [Session::expect].
68
    pub fn set_expect_lazy(&mut self, is_lazy: bool) {
×
69
        self.stream.expect_lazy = is_lazy;
×
70
    }
71

72
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R>(
×
73
        mut self,
74
        new_stream: F,
75
    ) -> Result<Session<P, R>, Error> {
76
        let buf = self.stream.get_available().to_owned();
×
77

78
        let stream = self.stream.into_inner();
×
79
        let stream = new_stream(stream);
×
80
        let mut session = Session::new(self.process, stream)?;
×
81
        session.stream.keep(&buf);
×
82
        Ok(session)
×
83
    }
84
}
85

86
impl<P, S> AsyncExpect for Session<P, S>
87
where
88
    S: AsyncWrite + AsyncRead + Unpin,
89
{
90
    async fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
91
    where
92
        N: Needle,
93
    {
94
        match self.stream.expect_lazy {
×
95
            true => self.stream.expect_lazy(needle).await,
×
96
            false => self.stream.expect_gready(needle).await,
×
97
        }
98
    }
99

100
    async fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
101
    where
102
        N: Needle,
103
    {
UNCOV
104
        self.stream.check(needle).await
×
105
    }
106

107
    async fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
108
    where
109
        N: Needle,
110
    {
UNCOV
111
        self.stream.is_matched(needle).await
×
112
    }
113

114
    async fn send<B>(&mut self, buf: B) -> Result<(), Error>
115
    where
116
        B: AsRef<[u8]>,
117
    {
NEW
118
        self.stream.write_all(buf.as_ref()).await.map_err(Error::IO)
×
119
    }
120

121
    async fn send_line<B>(&mut self, buf: B) -> Result<(), Error>
122
    where
123
        B: AsRef<[u8]>,
124
    {
125
        #[cfg(windows)]
126
        const LINE_ENDING: &[u8] = b"\r\n";
127
        #[cfg(not(windows))]
128
        const LINE_ENDING: &[u8] = b"\n";
129

130
        self.stream.write_all(buf.as_ref()).await?;
×
131
        self.stream.write_all(LINE_ENDING).await?;
×
132

133
        Ok(())
×
134
    }
135
}
136

137
impl<P, S> Healthcheck for Session<P, S>
138
where
139
    P: Healthcheck,
140
{
141
    type Status = P::Status;
142

143
    /// Verifies whether process is still alive.
NEW
144
    fn is_alive(&self) -> io::Result<bool> {
×
NEW
145
        P::is_alive(self.get_process())
×
146
    }
147

NEW
148
    fn get_status(&self) -> io::Result<Self::Status> {
×
NEW
149
        P::get_status(self.get_process())
×
150
    }
151
}
152

153
impl<P, S> AsyncWrite for Session<P, S>
154
where
155
    P: Unpin,
156
    S: AsyncWrite + Unpin,
157
{
UNCOV
158
    fn poll_write(
×
159
        self: Pin<&mut Self>,
160
        cx: &mut Context<'_>,
161
        buf: &[u8],
162
    ) -> Poll<io::Result<usize>> {
163
        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
×
164
    }
165

166
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
167
        Pin::new(&mut self.stream).poll_flush(cx)
×
168
    }
169

170
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
171
        Pin::new(&mut self.stream).poll_close(cx)
×
172
    }
173

174
    fn poll_write_vectored(
×
175
        mut self: Pin<&mut Self>,
176
        cx: &mut Context<'_>,
177
        bufs: &[io::IoSlice<'_>],
178
    ) -> Poll<io::Result<usize>> {
179
        Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
×
180
    }
181
}
182

183
impl<P, S> AsyncRead for Session<P, S>
184
where
185
    P: Unpin,
186
    S: AsyncRead + Unpin,
187
{
UNCOV
188
    fn poll_read(
×
189
        mut self: Pin<&mut Self>,
190
        cx: &mut Context<'_>,
191
        buf: &mut [u8],
192
    ) -> Poll<io::Result<usize>> {
193
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
194
    }
195
}
196

197
impl<P, S> AsyncBufRead for Session<P, S>
198
where
199
    P: Unpin,
200
    S: AsyncRead + Unpin,
201
{
202
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
203
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
204
    }
205

206
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
207
        Pin::new(&mut self.stream).consume(amt);
×
208
    }
209
}
210

211
impl<P, S> Termios for Session<P, S>
212
where
213
    P: Termios,
214
{
NEW
215
    fn is_echo(&self) -> io::Result<bool> {
×
NEW
216
        P::is_echo(self.get_process())
×
217
    }
218

NEW
219
    fn set_echo(&mut self, on: bool) -> io::Result<bool> {
×
NEW
220
        P::set_echo(self.get_process_mut(), on)
×
221
    }
222
}
223

224
/// Session represents a spawned process and its streams.
225
/// It controlls process and communication with it.
226
#[derive(Debug)]
227
struct Stream<S> {
228
    stream: BufferedStream<S>,
229
    expect_timeout: Option<Duration>,
230
    expect_lazy: bool,
231
}
232

233
impl<S> Stream<S> {
234
    /// Creates an async IO stream.
235
    fn new(stream: S) -> Self {
×
236
        Self {
237
            stream: BufferedStream::new(stream),
×
238
            expect_timeout: Some(Duration::from_millis(10000)),
×
239
            expect_lazy: false,
240
        }
241
    }
242

243
    /// Returns a reference to original stream.
244
    fn as_ref(&self) -> &S {
×
245
        &self.stream.stream
×
246
    }
247

248
    /// Returns a mut reference to original stream.
249
    fn as_mut(&mut self) -> &mut S {
×
250
        &mut self.stream.stream
×
251
    }
252

253
    /// Set the pty session's expect timeout.
254
    fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
255
        self.expect_timeout = expect_timeout;
×
256
    }
257

258
    /// Save a bytes in inner buffer.
259
    /// They'll be pushed to the end of the buffer.
260
    fn keep(&mut self, buf: &[u8]) {
×
261
        self.stream.keep(buf);
×
262
    }
263

264
    /// Get an inner buffer.
265
    fn get_available(&mut self) -> &[u8] {
×
266
        self.stream.buffer()
×
267
    }
268

269
    /// Returns an inner IO stream.
270
    fn into_inner(self) -> S {
×
271
        self.stream.stream
×
272
    }
273
}
274

275
impl<S> Stream<S>
276
where
277
    S: AsyncRead + Unpin,
278
{
279
    async fn expect_gready<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
280
        let expect_timeout = self.expect_timeout;
×
281

282
        let expect_future = async {
×
283
            let mut eof = false;
×
284
            loop {
×
285
                let data = self.stream.buffer();
×
286

287
                let found = Needle::check(&needle, data, eof)?;
×
288

289
                if !found.is_empty() {
×
290
                    let end_index = Captures::right_most_index(&found);
×
291
                    let involved_bytes = data[..end_index].to_vec();
×
292
                    self.stream.consume(end_index);
×
293

294
                    return Ok(Captures::new(involved_bytes, found));
×
295
                }
296

297
                if eof {
×
298
                    return Err(Error::Eof);
×
299
                }
300

301
                eof = self.stream.fill().await? == 0;
×
302
            }
303
        };
304

305
        if let Some(timeout) = expect_timeout {
×
306
            let timeout_future = futures_timer::Delay::new(timeout);
×
307
            futures_lite::future::or(expect_future, async {
×
308
                timeout_future.await;
×
309
                Err(Error::ExpectTimeout)
×
310
            })
311
            .await
×
312
        } else {
313
            expect_future.await
×
314
        }
315
    }
316

317
    async fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
318
    where
319
        N: Needle,
320
    {
321
        let expect_timeout = self.expect_timeout;
×
322
        let expect_future = async {
×
323
            // We read by byte to make things as lazy as possible.
324
            //
325
            // It's chose is important in using Regex as a Needle.
326
            // Imagine we have a `\d+` regex.
327
            // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
328
            //
329
            // The second reason is
330
            // if we wouldn't read by byte EOF indication could be lost.
331
            // And next blocking std::io::Read operation could be blocked forever.
332
            //
333
            // We could read all data available via `read_available` to reduce IO operations,
334
            // but in such case we would need to keep a EOF indicator internally in stream,
335
            // which is OK if EOF happens onces, but I am not sure if this is a case.
336

337
            let mut checked_length = 0;
×
338
            let mut eof = false;
×
339
            loop {
×
340
                let available = self.stream.buffer();
×
341
                let is_buffer_checked = checked_length == available.len();
×
342
                if is_buffer_checked {
×
343
                    let n = self.stream.fill().await?;
×
344
                    eof = n == 0;
×
345
                }
346

347
                // We intentinally not increase the counter
348
                // and run check one more time even though the data isn't changed.
349
                // Because it may be important for custom implementations of Needle.
350
                let available = self.stream.buffer();
×
351
                if checked_length < available.len() {
×
352
                    checked_length += 1;
×
353
                }
354

355
                let data = &available[..checked_length];
×
356
                let found = Needle::check(&needle, data, eof)?;
×
357
                if !found.is_empty() {
×
358
                    let end_index = Captures::right_most_index(&found);
×
359
                    let involved_bytes = data[..end_index].to_vec();
×
360
                    self.stream.consume(end_index);
×
361
                    return Ok(Captures::new(involved_bytes, found));
×
362
                }
363

364
                if eof {
×
365
                    return Err(Error::Eof);
×
366
                }
367
            }
368
        };
369

370
        if let Some(timeout) = expect_timeout {
×
371
            let timeout_future = futures_timer::Delay::new(timeout);
×
372
            futures_lite::future::or(expect_future, async {
×
373
                timeout_future.await;
×
374
                Err(Error::ExpectTimeout)
×
375
            })
376
            .await
×
377
        } else {
378
            expect_future.await
×
379
        }
380
    }
381

382
    /// Is matched checks if a pattern is matched.
383
    /// It doesn't consumes bytes from stream.
384
    async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
385
        let eof = self.try_fill().await?;
×
386
        let buf = self.stream.buffer();
×
387

388
        let found = needle.check(buf, eof)?;
×
389
        if !found.is_empty() {
×
390
            return Ok(true);
×
391
        }
392

393
        if eof {
×
394
            return Err(Error::Eof);
×
395
        }
396

397
        Ok(false)
398
    }
399

400
    /// Check checks if a pattern is matched.
401
    /// Returns empty found structure if nothing found.
402
    async fn check<E>(&mut self, needle: E) -> Result<Captures, Error>
403
    where
404
        E: Needle,
405
    {
UNCOV
406
        let eof = self.try_fill().await?;
×
407

408
        let buf = self.stream.buffer();
×
409
        let found = needle.check(buf, eof)?;
×
410
        if !found.is_empty() {
×
411
            let end_index = Captures::right_most_index(&found);
×
412
            let involved_bytes = buf[..end_index].to_vec();
×
413
            self.stream.consume(end_index);
×
414
            return Ok(Captures::new(involved_bytes, found));
×
415
        }
416

417
        if eof {
×
418
            return Err(Error::Eof);
×
419
        }
420

421
        Ok(Captures::new(Vec::new(), Vec::new()))
×
422
    }
423

424
    /// Verifyes if stream is empty or not.
425
    async fn is_empty(&mut self) -> io::Result<bool> {
×
426
        match futures_lite::future::poll_once(self.read(&mut [])).await {
×
427
            Some(Ok(0)) => Ok(true),
428
            Some(Ok(_)) => Ok(false),
429
            Some(Err(err)) => Err(err),
×
430
            None => Ok(true),
431
        }
432
    }
433

434
    async fn try_fill(&mut self) -> Result<bool, Error> {
×
435
        match futures_lite::future::poll_once(self.stream.fill()).await {
×
436
            Some(Ok(n)) => Ok(n == 0),
×
437
            Some(Err(err)) => Err(err.into()),
×
438
            None => Ok(false),
439
        }
440
    }
441
}
442

443
impl<S> AsyncWrite for Stream<S>
444
where
445
    S: AsyncWrite + Unpin,
446
{
UNCOV
447
    fn poll_write(
×
448
        mut self: Pin<&mut Self>,
449
        cx: &mut Context<'_>,
450
        buf: &[u8],
451
    ) -> Poll<io::Result<usize>> {
452
        Pin::new(&mut *self.stream.get_mut()).poll_write(cx, buf)
×
453
    }
454

455
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
456
        Pin::new(&mut *self.stream.get_mut()).poll_flush(cx)
×
457
    }
458

459
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
460
        Pin::new(&mut *self.stream.get_mut()).poll_close(cx)
×
461
    }
462

463
    fn poll_write_vectored(
×
464
        mut self: Pin<&mut Self>,
465
        cx: &mut Context<'_>,
466
        bufs: &[io::IoSlice<'_>],
467
    ) -> Poll<io::Result<usize>> {
468
        Pin::new(&mut *self.stream.get_mut()).poll_write_vectored(cx, bufs)
×
469
    }
470
}
471

472
impl<S> AsyncRead for Stream<S>
473
where
474
    S: AsyncRead + Unpin,
475
{
UNCOV
476
    fn poll_read(
×
477
        mut self: Pin<&mut Self>,
478
        cx: &mut Context<'_>,
479
        buf: &mut [u8],
480
    ) -> Poll<io::Result<usize>> {
481
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
482
    }
483

NEW
484
    fn poll_read_vectored(
×
485
        self: Pin<&mut Self>,
486
        cx: &mut Context<'_>,
487
        bufs: &mut [IoSliceMut<'_>],
488
    ) -> Poll<io::Result<usize>> {
NEW
489
        for b in bufs {
×
NEW
490
            if !b.is_empty() {
×
NEW
491
                return self.poll_read(cx, b);
×
492
            }
493
        }
494

NEW
495
        self.poll_read(cx, &mut [])
×
496
    }
497
}
498

499
impl<S> AsyncBufRead for Stream<S>
500
where
501
    S: AsyncRead + Unpin,
502
{
503
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
504
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
505
    }
506

507
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
508
        Pin::new(&mut self.stream).consume(amt);
×
509
    }
510
}
511

512
/// Session represents a spawned process and its streams.
513
/// It controlls process and communication with it.
514
#[derive(Debug)]
515
struct BufferedStream<S> {
516
    stream: S,
517
    buffer: Vec<u8>,
518
    length: usize,
519
}
520

521
impl<S> BufferedStream<S> {
522
    fn new(stream: S) -> Self {
×
523
        Self {
524
            stream,
525
            buffer: Vec::new(),
×
526
            length: 0,
527
        }
528
    }
529

530
    fn keep(&mut self, buf: &[u8]) {
×
531
        self.buffer.extend(buf);
×
532
        self.length += buf.len();
×
533
    }
534

535
    fn buffer(&self) -> &[u8] {
×
536
        &self.buffer[..self.length]
×
537
    }
538

539
    fn get_mut(&mut self) -> &mut S {
×
540
        &mut self.stream
×
541
    }
542
}
543

544
impl<S: AsyncRead + Unpin> BufferedStream<S> {
545
    async fn fill(&mut self) -> io::Result<usize> {
×
546
        let mut buf = [0; 128];
×
547
        let n = self.stream.read(&mut buf).await?;
×
548
        self.keep(&buf[..n]);
×
549
        Ok(n)
×
550
    }
551
}
552

553
impl<S: AsyncRead + Unpin> AsyncRead for BufferedStream<S> {
554
    fn poll_read(
×
555
        mut self: Pin<&mut Self>,
556
        cx: &mut Context<'_>,
557
        buf: &mut [u8],
558
    ) -> Poll<io::Result<usize>> {
559
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
560
        let nread = std::io::Read::read(&mut rem, buf)?;
×
561
        self.consume(nread);
×
562
        Poll::Ready(Ok(nread))
×
563
    }
564

565
    fn poll_read_vectored(
×
566
        mut self: Pin<&mut Self>,
567
        cx: &mut Context<'_>,
568
        bufs: &mut [IoSliceMut<'_>],
569
    ) -> Poll<io::Result<usize>> {
570
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
571
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
×
572
        self.consume(nread);
×
573
        Poll::Ready(Ok(nread))
×
574
    }
575
}
576

577
impl<S: AsyncRead + Unpin> AsyncBufRead for BufferedStream<S> {
578
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
579
        if self.buffer.is_empty() {
×
580
            let mut buf = [0; 128];
×
581
            let n = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
×
582
            self.keep(&buf[..n]);
×
583
        }
584

585
        let buf = self.get_mut().buffer();
×
586
        Poll::Ready(Ok(buf))
×
587
    }
588

589
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
590
        let _ = self.buffer.drain(..amt);
×
591
        self.length -= amt;
×
592
    }
593
}
594

595
#[cfg(test)]
596
mod tests {
597
    use futures_lite::AsyncWriteExt;
598

599
    use crate::Eof;
600

601
    use super::*;
602

603
    #[test]
604
    fn test_expect_lazy() {
605
        let buf = b"Hello World".to_vec();
606
        let cursor = futures_lite::io::Cursor::new(buf);
607
        let mut stream = Stream::new(cursor);
608

609
        futures_lite::future::block_on(async {
610
            let found = stream.expect_lazy("World").await.unwrap();
611
            assert_eq!(b"Hello ", found.before());
612
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
613
        });
614
    }
615

616
    #[test]
617
    fn test_expect_lazy_eof() {
618
        let buf = b"Hello World".to_vec();
619
        let cursor = futures_lite::io::Cursor::new(buf);
620
        let mut stream = Stream::new(cursor);
621

622
        futures_lite::future::block_on(async {
623
            let found = stream.expect_lazy(Eof).await.unwrap();
624
            assert_eq!(b"", found.before());
625
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
626
        });
627

628
        let cursor = futures_lite::io::Cursor::new(Vec::new());
629
        let mut stream = Stream::new(cursor);
630

631
        futures_lite::future::block_on(async {
632
            let err = stream.expect_lazy("").await.unwrap_err();
633
            assert!(matches!(err, Error::Eof));
634
        });
635
    }
636

637
    #[test]
638
    fn test_expect_lazy_timeout() {
639
        futures_lite::future::block_on(async {
640
            let mut stream = Stream::new(NoEofReader::default());
641
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
642

643
            stream.write_all(b"Hello").await.unwrap();
644

645
            let err = stream.expect_lazy("Hello World").await.unwrap_err();
646
            assert!(matches!(err, Error::ExpectTimeout));
647

648
            stream.write_all(b" World").await.unwrap();
649
            let found = stream.expect_lazy("World").await.unwrap();
650
            assert_eq!(b"Hello ", found.before());
651
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
652
        });
653
    }
654

655
    #[test]
656
    fn test_expect_gready() {
657
        let buf = b"Hello World".to_vec();
658
        let cursor = futures_lite::io::Cursor::new(buf);
659
        let mut stream = Stream::new(cursor);
660

661
        futures_lite::future::block_on(async {
662
            let found = stream.expect_gready("World").await.unwrap();
663
            assert_eq!(b"Hello ", found.before());
664
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
665
        });
666
    }
667

668
    #[test]
669
    fn test_expect_gready_eof() {
670
        let buf = b"Hello World".to_vec();
671
        let cursor = futures_lite::io::Cursor::new(buf);
672
        let mut stream = Stream::new(cursor);
673

674
        futures_lite::future::block_on(async {
675
            let found = stream.expect_gready(Eof).await.unwrap();
676
            assert_eq!(b"", found.before());
677
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
678
        });
679

680
        let cursor = futures_lite::io::Cursor::new(Vec::new());
681
        let mut stream = Stream::new(cursor);
682

683
        futures_lite::future::block_on(async {
684
            let err = stream.expect_gready("").await.unwrap_err();
685
            assert!(matches!(err, Error::Eof));
686
        });
687
    }
688

689
    #[test]
690
    fn test_expect_gready_timeout() {
691
        futures_lite::future::block_on(async {
692
            let mut stream = Stream::new(NoEofReader::default());
693
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
694

695
            stream.write_all(b"Hello").await.unwrap();
696

697
            let err = stream.expect_gready("Hello World").await.unwrap_err();
698
            assert!(matches!(err, Error::ExpectTimeout));
699

700
            stream.write_all(b" World").await.unwrap();
701
            let found = stream.expect_gready("World").await.unwrap();
702
            assert_eq!(b"Hello ", found.before());
703
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
704
        });
705
    }
706

707
    #[test]
708
    fn test_check() {
709
        let buf = b"Hello World".to_vec();
710
        let cursor = futures_lite::io::Cursor::new(buf);
711
        let mut stream = Stream::new(cursor);
712

713
        futures_lite::future::block_on(async {
714
            let found = stream.check("World").await.unwrap();
715
            assert_eq!(b"Hello ", found.before());
716
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
717
        });
718
    }
719

720
    #[test]
721
    fn test_is_matched() {
722
        let mut stream = Stream::new(NoEofReader::default());
723
        futures_lite::future::block_on(async {
724
            stream.write_all(b"Hello World").await.unwrap();
725
            assert!(stream.is_matched("World").await.unwrap());
726
            assert!(!stream.is_matched("*****").await.unwrap());
727

728
            let found = stream.check("World").await.unwrap();
729
            assert_eq!(b"Hello ", found.before());
730
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
731
        });
732
    }
733

734
    #[derive(Debug, Default)]
735
    struct NoEofReader {
736
        data: Vec<u8>,
737
    }
738

739
    impl AsyncWrite for NoEofReader {
740
        fn poll_write(
×
741
            mut self: Pin<&mut Self>,
742
            _: &mut Context<'_>,
743
            buf: &[u8],
744
        ) -> Poll<io::Result<usize>> {
745
            self.data.extend(buf);
×
746
            Poll::Ready(Ok(buf.len()))
×
747
        }
748

749
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
750
            Poll::Ready(Ok(()))
×
751
        }
752

753
        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
754
            Poll::Ready(Ok(()))
×
755
        }
756
    }
757

758
    impl AsyncRead for NoEofReader {
759
        fn poll_read(
×
760
            mut self: Pin<&mut Self>,
761
            _: &mut Context<'_>,
762
            mut buf: &mut [u8],
763
        ) -> Poll<io::Result<usize>> {
764
            if self.data.is_empty() {
×
765
                return Poll::Pending;
×
766
            }
767

768
            let n = std::io::Write::write(&mut buf, &self.data)?;
×
769
            let _ = self.data.drain(..n);
×
770
            Poll::Ready(Ok(n))
×
771
        }
772
    }
773
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc