• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8126127478

02 Mar 2024 11:30PM UTC coverage: 47.529% (-9.0%) from 56.575%
8126127478

Pull #68

github

web-flow
Merge e7c63072c into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

148 of 344 new or added lines in 14 files covered. (43.02%)

728 existing lines in 15 files now uncovered.

1491 of 3137 relevant lines covered (47.53%)

3.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.38
/src/session/async_session.rs
1
//! Module contains an async version of Session structure.
2

3
use std::{
4
    io::{self, IoSliceMut},
5
    pin::Pin,
6
    task::{Context, Poll},
7
    time::Duration,
8
};
9

10
use futures_lite::{
11
    ready, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
12
};
13

14
use crate::{
15
    process::{Healthcheck, Termios},
16
    AsyncExpect, Captures, Error, Expect, Needle,
17
};
18

19
/// Session represents a spawned process and its streams.
20
/// It controlls process and communication with it.
21
#[derive(Debug)]
22
pub struct Session<P, S> {
23
    process: P,
24
    stream: Stream<S>,
25
}
26

27
// GEt back to the solution where Logger is just dyn Write instead of all these magic with type system.....
28

29
impl<P, S> Session<P, S> {
30
    /// Create a new session.
31
    pub fn new(process: P, stream: S) -> io::Result<Self> {
10✔
32
        Ok(Self {
10✔
33
            process,
10✔
34
            stream: Stream::new(stream),
10✔
35
        })
36
    }
37

38
    /// Get a reference to original stream.
39
    pub fn get_stream(&self) -> &S {
×
40
        self.stream.as_ref()
×
41
    }
42

43
    /// Get a mut reference to original stream.
44
    pub fn get_stream_mut(&mut self) -> &mut S {
×
45
        self.stream.as_mut()
×
46
    }
47

48
    /// Get a reference to a process running program.
49
    pub fn get_process(&self) -> &P {
5✔
50
        &self.process
5✔
51
    }
52

53
    /// Get a mut reference to a process running program.
54
    pub fn get_process_mut(&mut self) -> &mut P {
5✔
55
        &mut self.process
5✔
56
    }
57

58
    /// Set the pty session's expect timeout.
59
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
1✔
60
        self.stream.set_expect_timeout(expect_timeout);
1✔
61
    }
62

63
    /// Set a expect algorithm to be either gready or lazy.
64
    ///
65
    /// Default algorithm is gready.
66
    ///
67
    /// See [Session::expect].
68
    pub fn set_expect_lazy(&mut self, is_lazy: bool) {
1✔
69
        self.stream.expect_lazy = is_lazy;
1✔
70
    }
71

72
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R>(
2✔
73
        mut self,
74
        new_stream: F,
75
    ) -> Result<Session<P, R>, Error> {
76
        let buf = self.stream.get_available().to_owned();
4✔
77

78
        let stream = self.stream.into_inner();
4✔
79
        let stream = new_stream(stream);
2✔
80
        let mut session = Session::new(self.process, stream)?;
2✔
81
        session.stream.keep(&buf);
4✔
82
        Ok(session)
2✔
83
    }
84
}
85

86
impl<P, S: AsyncRead + Unpin> Session<P, S> {
87
    /// Expect waits until a pattern is matched.
88
    ///
89
    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
90
    ///
91
    /// The match algorthm can be either
92
    ///     - gready
93
    ///     - lazy
94
    ///
95
    /// You can set one via [Session::set_expect_lazy].
96
    /// Default version is gready.
97
    ///
98
    /// The implications are.
99
    ///
100
    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
101
    /// And your process outputs `123`.
102
    /// In case of lazy approach we will match `1`.
103
    /// Where's in case of gready one we will match `123`.
104
    ///
105
    /// # Example
106
    ///
107
    #[cfg_attr(windows, doc = "```no_run")]
108
    #[cfg_attr(unix, doc = "```")]
109
    /// # futures_lite::future::block_on(async {
110
    /// let mut p = expectrl::spawn("echo 123").unwrap();
111
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
112
    /// assert_eq!(m.get(0).unwrap(), b"123");
113
    /// # });
114
    /// ```
115
    ///
116
    #[cfg_attr(windows, doc = "```no_run")]
117
    #[cfg_attr(unix, doc = "```")]
118
    /// # futures_lite::future::block_on(async {
119
    /// let mut p = expectrl::spawn("echo 123").unwrap();
120
    /// p.set_expect_lazy(true);
121
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
122
    /// assert_eq!(m.get(0).unwrap(), b"1");
123
    /// # });
124
    /// ```
125
    ///
126
    /// This behaviour is different from [Session::check].
127
    ///
128
    /// It returns an error if timeout is reached.
129
    /// You can specify a timeout value by [Session::set_expect_timeout] method.
130
    pub async fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
14✔
131
    where
132
        N: Needle,
133
    {
134
        match self.stream.expect_lazy {
7✔
135
            true => self.stream.expect_lazy(needle).await,
2✔
136
            false => self.stream.expect_gready(needle).await,
8✔
137
        }
138
    }
139

140
    /// Check checks if a pattern is matched.
141
    /// Returns empty found structure if nothing found.
142
    ///
143
    /// Is a non blocking version of [Session::expect].
144
    /// But its strategy of matching is different from it.
145
    /// It makes search agains all bytes available.
146
    ///
147
    #[cfg_attr(any(target_os = "macos", windows), doc = "```no_run")]
148
    #[cfg_attr(not(any(target_os = "macos", windows)), doc = "```")]
149
    /// # futures_lite::future::block_on(async {
150
    /// let mut p = expectrl::spawn("echo 123").unwrap();
151
    /// // wait to guarantee that check will successed (most likely)
152
    /// std::thread::sleep(std::time::Duration::from_secs(1));
153
    /// let m = p.check(expectrl::Regex("\\d+")).await.unwrap();
154
    /// assert_eq!(m.get(0).unwrap(), b"123");
155
    /// # });
156
    /// ```
157
    pub async fn check<E>(&mut self, needle: E) -> Result<Captures, Error>
10✔
158
    where
159
        E: Needle,
160
    {
161
        self.stream.check(needle).await
10✔
162
    }
163

164
    /// Is matched checks if a pattern is matched.
165
    /// It doesn't consumes bytes from stream.
166
    pub async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
20✔
167
        self.stream.is_matched(needle).await
8✔
168
    }
169

170
    /// Verifyes if stream is empty or not.
171
    pub async fn is_empty(&mut self) -> io::Result<bool> {
6✔
172
        self.stream.is_empty().await
2✔
173
    }
174
}
175

176
impl<P, S> AsyncExpect for Session<P, S>
177
where
178
    S: AsyncWrite + AsyncRead + Unpin,
179
{
180
    async fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
2✔
181
    where
182
        N: Needle,
183
    {
184
        match self.stream.expect_lazy {
1✔
NEW
185
            true => self.stream.expect_lazy(needle).await,
×
186
            false => self.stream.expect_gready(needle).await,
2✔
187
        }
188
    }
189

190
    async fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
6✔
191
    where
192
        N: Needle,
193
    {
194
        self.stream.check(needle).await
6✔
195
    }
196

197
    async fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
198
    where
199
        N: Needle,
200
    {
NEW
201
        self.stream.is_matched(needle).await
×
202
    }
203

204
    async fn send<B>(&mut self, buf: B) -> Result<(), Error>
10✔
205
    where
206
        B: AsRef<[u8]>,
207
    {
208
        self.stream.write_all(buf.as_ref()).await.map_err(Error::IO)
15✔
209
    }
210

211
    async fn send_line<B>(&mut self, buf: B) -> Result<(), Error>
16✔
212
    where
213
        B: AsRef<[u8]>,
214
    {
215
        #[cfg(windows)]
216
        const LINE_ENDING: &[u8] = b"\r\n";
217
        #[cfg(not(windows))]
218
        const LINE_ENDING: &[u8] = b"\n";
219

220
        self.stream.write_all(buf.as_ref()).await?;
24✔
221
        self.stream.write_all(LINE_ENDING).await?;
24✔
222

223
        Ok(())
8✔
224
    }
225
}
226

227
impl<P, S> Healthcheck for Session<P, S>
228
where
229
    P: Healthcheck,
230
{
231
    type Status = P::Status;
232

233
    /// Verifies whether process is still alive.
NEW
234
    fn is_alive(&self) -> io::Result<bool> {
×
NEW
235
        P::is_alive(self.get_process())
×
236
    }
237

238
    fn get_status(&self) -> io::Result<Self::Status> {
2✔
239
        P::get_status(self.get_process())
2✔
240
    }
241
}
242

243
impl<P, S> AsyncWrite for Session<P, S>
244
where
245
    P: Unpin,
246
    S: AsyncWrite + Unpin,
247
{
248
    fn poll_write(
2✔
249
        self: Pin<&mut Self>,
250
        cx: &mut Context<'_>,
251
        buf: &[u8],
252
    ) -> Poll<io::Result<usize>> {
253
        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
2✔
254
    }
255

256
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1✔
257
        Pin::new(&mut self.stream).poll_flush(cx)
2✔
258
    }
259

260
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
261
        Pin::new(&mut self.stream).poll_close(cx)
×
262
    }
263

264
    fn poll_write_vectored(
×
265
        mut self: Pin<&mut Self>,
266
        cx: &mut Context<'_>,
267
        bufs: &[io::IoSlice<'_>],
268
    ) -> Poll<io::Result<usize>> {
269
        Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
×
270
    }
271
}
272

273
impl<P, S> AsyncRead for Session<P, S>
274
where
275
    P: Unpin,
276
    S: AsyncRead + Unpin,
277
{
278
    fn poll_read(
7✔
279
        mut self: Pin<&mut Self>,
280
        cx: &mut Context<'_>,
281
        buf: &mut [u8],
282
    ) -> Poll<io::Result<usize>> {
283
        Pin::new(&mut self.stream).poll_read(cx, buf)
14✔
284
    }
285
}
286

287
impl<P, S> AsyncBufRead for Session<P, S>
288
where
289
    P: Unpin,
290
    S: AsyncRead + Unpin,
291
{
292
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
4✔
293
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
4✔
294
    }
295

296
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
4✔
297
        Pin::new(&mut self.stream).consume(amt);
8✔
298
    }
299
}
300

301
impl<P, S> Termios for Session<P, S>
302
where
303
    P: Termios,
304
{
305
    fn is_echo(&self) -> io::Result<bool> {
2✔
306
        P::is_echo(self.get_process())
2✔
307
    }
308

309
    fn set_echo(&mut self, on: bool) -> io::Result<bool> {
2✔
310
        P::set_echo(self.get_process_mut(), on)
2✔
311
    }
312
}
313

314
/// Session represents a spawned process and its streams.
315
/// It controlls process and communication with it.
316
#[derive(Debug)]
317
struct Stream<S> {
318
    stream: BufferedStream<S>,
319
    expect_timeout: Option<Duration>,
320
    expect_lazy: bool,
321
}
322

323
impl<S> Stream<S> {
324
    /// Creates an async IO stream.
325
    fn new(stream: S) -> Self {
12✔
326
        Self {
327
            stream: BufferedStream::new(stream),
12✔
328
            expect_timeout: Some(Duration::from_millis(10000)),
24✔
329
            expect_lazy: false,
330
        }
331
    }
332

333
    /// Returns a reference to original stream.
334
    fn as_ref(&self) -> &S {
×
335
        &self.stream.stream
×
336
    }
337

338
    /// Returns a mut reference to original stream.
339
    fn as_mut(&mut self) -> &mut S {
×
340
        &mut self.stream.stream
×
341
    }
342

343
    /// Set the pty session's expect timeout.
344
    fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
2✔
345
        self.expect_timeout = expect_timeout;
2✔
346
    }
347

348
    /// Save a bytes in inner buffer.
349
    /// They'll be pushed to the end of the buffer.
350
    fn keep(&mut self, buf: &[u8]) {
2✔
351
        self.stream.keep(buf);
2✔
352
    }
353

354
    /// Get an inner buffer.
355
    fn get_available(&mut self) -> &[u8] {
2✔
356
        self.stream.buffer()
2✔
357
    }
358

359
    /// Returns an inner IO stream.
360
    fn into_inner(self) -> S {
2✔
361
        self.stream.stream
2✔
362
    }
363
}
364

365
impl<S> Stream<S>
366
where
367
    S: AsyncRead + Unpin,
368
{
369
    async fn expect_gready<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
66✔
370
        let expect_timeout = self.expect_timeout;
11✔
371

372
        let expect_future = async {
44✔
373
            let mut eof = false;
11✔
374
            loop {
22✔
375
                let data = self.stream.buffer();
22✔
376

377
                let found = Needle::check(&needle, data, eof)?;
11✔
378

379
                if !found.is_empty() {
22✔
380
                    let end_index = Captures::right_most_index(&found);
20✔
381
                    let involved_bytes = data[..end_index].to_vec();
20✔
382
                    self.stream.consume(end_index);
10✔
383

384
                    return Ok(Captures::new(involved_bytes, found));
10✔
385
                }
386

387
                if eof {
11✔
388
                    return Err(Error::Eof);
2✔
389
                }
390

391
                eof = self.stream.fill().await? == 0;
49✔
392
            }
393
        };
394

395
        if let Some(timeout) = expect_timeout {
22✔
396
            let timeout_future = futures_timer::Delay::new(timeout);
22✔
397
            futures_lite::future::or(expect_future, async {
50✔
398
                timeout_future.await;
9✔
399
                Err(Error::ExpectTimeout)
2✔
400
            })
401
            .await
36✔
402
        } else {
403
            expect_future.await
2✔
404
        }
405
    }
406

407
    async fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
8✔
408
    where
409
        N: Needle,
410
    {
411
        let expect_timeout = self.expect_timeout;
4✔
412
        let expect_future = async {
16✔
413
            // We read by byte to make things as lazy as possible.
414
            //
415
            // It's chose is important in using Regex as a Needle.
416
            // Imagine we have a `\d+` regex.
417
            // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
418
            //
419
            // The second reason is
420
            // if we wouldn't read by byte EOF indication could be lost.
421
            // And next blocking std::io::Read operation could be blocked forever.
422
            //
423
            // We could read all data available via `read_available` to reduce IO operations,
424
            // but in such case we would need to keep a EOF indicator internally in stream,
425
            // which is OK if EOF happens onces, but I am not sure if this is a case.
426

427
            let mut checked_length = 0;
4✔
428
            let mut eof = false;
4✔
429
            loop {
8✔
430
                let available = self.stream.buffer();
8✔
431
                let is_buffer_checked = checked_length == available.len();
4✔
432
                if is_buffer_checked {
8✔
433
                    let n = self.stream.fill().await?;
14✔
434
                    eof = n == 0;
4✔
435
                }
436

437
                // We intentinally not increase the counter
438
                // and run check one more time even though the data isn't changed.
439
                // Because it may be important for custom implementations of Needle.
440
                let available = self.stream.buffer();
4✔
441
                if checked_length < available.len() {
8✔
442
                    checked_length += 1;
4✔
443
                }
444

445
                let data = &available[..checked_length];
8✔
446
                let found = Needle::check(&needle, data, eof)?;
8✔
447
                if !found.is_empty() {
8✔
448
                    let end_index = Captures::right_most_index(&found);
8✔
449
                    let involved_bytes = data[..end_index].to_vec();
8✔
450
                    self.stream.consume(end_index);
4✔
451
                    return Ok(Captures::new(involved_bytes, found));
4✔
452
                }
453

454
                if eof {
4✔
455
                    return Err(Error::Eof);
1✔
456
                }
457
            }
458
        };
459

460
        if let Some(timeout) = expect_timeout {
8✔
461
            let timeout_future = futures_timer::Delay::new(timeout);
8✔
462
            futures_lite::future::or(expect_future, async {
17✔
463
                timeout_future.await;
3✔
464
                Err(Error::ExpectTimeout)
1✔
465
            })
466
            .await
13✔
467
        } else {
468
            expect_future.await
×
469
        }
470
    }
471

472
    /// Is matched checks if a pattern is matched.
473
    /// It doesn't consumes bytes from stream.
474
    async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
30✔
475
        let eof = self.try_fill().await?;
10✔
476
        let buf = self.stream.buffer();
10✔
477

478
        let found = needle.check(buf, eof)?;
5✔
479
        if !found.is_empty() {
10✔
480
            return Ok(true);
5✔
481
        }
482

483
        if eof {
2✔
484
            return Err(Error::Eof);
×
485
        }
486

487
        Ok(false)
488
    }
489

490
    /// Check checks if a pattern is matched.
491
    /// Returns empty found structure if nothing found.
492
    async fn check<E>(&mut self, needle: E) -> Result<Captures, Error>
16✔
493
    where
494
        E: Needle,
495
    {
496
        let eof = self.try_fill().await?;
16✔
497

498
        let buf = self.stream.buffer();
16✔
499
        let found = needle.check(buf, eof)?;
16✔
500
        if !found.is_empty() {
16✔
501
            let end_index = Captures::right_most_index(&found);
14✔
502
            let involved_bytes = buf[..end_index].to_vec();
14✔
503
            self.stream.consume(end_index);
7✔
504
            return Ok(Captures::new(involved_bytes, found));
7✔
505
        }
506

507
        if eof {
3✔
508
            return Err(Error::Eof);
×
509
        }
510

511
        Ok(Captures::new(Vec::new(), Vec::new()))
6✔
512
    }
513

514
    /// Verifyes if stream is empty or not.
515
    async fn is_empty(&mut self) -> io::Result<bool> {
5✔
516
        match futures_lite::future::poll_once(self.read(&mut [])).await {
5✔
517
            Some(Ok(0)) => Ok(true),
518
            Some(Ok(_)) => Ok(false),
519
            Some(Err(err)) => Err(err),
×
520
            None => Ok(true),
521
        }
522
    }
523

524
    async fn try_fill(&mut self) -> Result<bool, Error> {
20✔
525
        match futures_lite::future::poll_once(self.stream.fill()).await {
12✔
526
            Some(Ok(n)) => Ok(n == 0),
4✔
527
            Some(Err(err)) => Err(err.into()),
×
528
            None => Ok(false),
529
        }
530
    }
531
}
532

533
impl<S> AsyncWrite for Stream<S>
534
where
535
    S: AsyncWrite + Unpin,
536
{
537
    fn poll_write(
10✔
538
        mut self: Pin<&mut Self>,
539
        cx: &mut Context<'_>,
540
        buf: &[u8],
541
    ) -> Poll<io::Result<usize>> {
542
        Pin::new(&mut *self.stream.get_mut()).poll_write(cx, buf)
20✔
543
    }
544

545
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1✔
546
        Pin::new(&mut *self.stream.get_mut()).poll_flush(cx)
2✔
547
    }
548

549
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
550
        Pin::new(&mut *self.stream.get_mut()).poll_close(cx)
×
551
    }
552

553
    fn poll_write_vectored(
×
554
        mut self: Pin<&mut Self>,
555
        cx: &mut Context<'_>,
556
        bufs: &[io::IoSlice<'_>],
557
    ) -> Poll<io::Result<usize>> {
558
        Pin::new(&mut *self.stream.get_mut()).poll_write_vectored(cx, bufs)
×
559
    }
560
}
561

562
impl<S> AsyncRead for Stream<S>
563
where
564
    S: AsyncRead + Unpin,
565
{
566
    fn poll_read(
7✔
567
        mut self: Pin<&mut Self>,
568
        cx: &mut Context<'_>,
569
        buf: &mut [u8],
570
    ) -> Poll<io::Result<usize>> {
571
        Pin::new(&mut self.stream).poll_read(cx, buf)
14✔
572
    }
573

NEW
574
    fn poll_read_vectored(
×
575
        self: Pin<&mut Self>,
576
        cx: &mut Context<'_>,
577
        bufs: &mut [IoSliceMut<'_>],
578
    ) -> Poll<io::Result<usize>> {
NEW
579
        for b in bufs {
×
NEW
580
            if !b.is_empty() {
×
NEW
581
                return self.poll_read(cx, b);
×
582
            }
583
        }
584

NEW
585
        self.poll_read(cx, &mut [])
×
586
    }
587
}
588

589
impl<S> AsyncBufRead for Stream<S>
590
where
591
    S: AsyncRead + Unpin,
592
{
593
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
4✔
594
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
4✔
595
    }
596

597
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
4✔
598
        Pin::new(&mut self.stream).consume(amt);
8✔
599
    }
600
}
601

602
/// Session represents a spawned process and its streams.
603
/// It controlls process and communication with it.
604
#[derive(Debug)]
605
struct BufferedStream<S> {
606
    stream: S,
607
    buffer: Vec<u8>,
608
    length: usize,
609
}
610

611
impl<S> BufferedStream<S> {
612
    fn new(stream: S) -> Self {
12✔
613
        Self {
614
            stream,
615
            buffer: Vec::new(),
12✔
616
            length: 0,
617
        }
618
    }
619

620
    fn keep(&mut self, buf: &[u8]) {
11✔
621
        self.buffer.extend(buf);
11✔
622
        self.length += buf.len();
12✔
623
    }
624

625
    fn buffer(&self) -> &[u8] {
12✔
626
        &self.buffer[..self.length]
12✔
627
    }
628

629
    fn get_mut(&mut self) -> &mut S {
10✔
630
        &mut self.stream
9✔
631
    }
632
}
633

634
impl<S: AsyncRead + Unpin> BufferedStream<S> {
635
    async fn fill(&mut self) -> io::Result<usize> {
35✔
636
        let mut buf = [0; 128];
7✔
637
        let n = self.stream.read(&mut buf).await?;
24✔
638
        self.keep(&buf[..n]);
14✔
639
        Ok(n)
7✔
640
    }
641
}
642

643
impl<S: AsyncRead + Unpin> AsyncRead for BufferedStream<S> {
644
    fn poll_read(
7✔
645
        mut self: Pin<&mut Self>,
646
        cx: &mut Context<'_>,
647
        buf: &mut [u8],
648
    ) -> Poll<io::Result<usize>> {
649
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
14✔
650
        let nread = std::io::Read::read(&mut rem, buf)?;
14✔
651
        self.consume(nread);
7✔
652
        Poll::Ready(Ok(nread))
7✔
653
    }
654

655
    fn poll_read_vectored(
×
656
        mut self: Pin<&mut Self>,
657
        cx: &mut Context<'_>,
658
        bufs: &mut [IoSliceMut<'_>],
659
    ) -> Poll<io::Result<usize>> {
660
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
661
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
×
662
        self.consume(nread);
×
663
        Poll::Ready(Ok(nread))
×
664
    }
665
}
666

667
impl<S: AsyncRead + Unpin> AsyncBufRead for BufferedStream<S> {
668
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
9✔
669
        if self.buffer.is_empty() {
9✔
670
            let mut buf = [0; 128];
6✔
671
            let n = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
18✔
672
            self.keep(&buf[..n]);
6✔
673
        }
674

675
        let buf = self.get_mut().buffer();
18✔
676
        Poll::Ready(Ok(buf))
9✔
677
    }
678

679
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
11✔
680
        let _ = self.buffer.drain(..amt);
11✔
681
        self.length -= amt;
11✔
682
    }
683
}
684

685
#[cfg(test)]
686
mod tests {
687
    use futures_lite::AsyncWriteExt;
688

689
    use crate::Eof;
690

691
    use super::*;
692

693
    #[test]
694
    fn test_expect_lazy() {
3✔
695
        let buf = b"Hello World".to_vec();
1✔
696
        let cursor = futures_lite::io::Cursor::new(buf);
1✔
697
        let mut stream = Stream::new(cursor);
1✔
698

699
        futures_lite::future::block_on(async {
4✔
700
            let found = stream.expect_lazy("World").await.unwrap();
2✔
701
            assert_eq!(b"Hello ", found.before());
2✔
702
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
703
        });
704
    }
705

706
    #[test]
707
    fn test_expect_lazy_eof() {
3✔
708
        let buf = b"Hello World".to_vec();
1✔
709
        let cursor = futures_lite::io::Cursor::new(buf);
1✔
710
        let mut stream = Stream::new(cursor);
1✔
711

712
        futures_lite::future::block_on(async {
4✔
713
            let found = stream.expect_lazy(Eof).await.unwrap();
2✔
714
            assert_eq!(b"", found.before());
2✔
715
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
1✔
716
        });
717

718
        let cursor = futures_lite::io::Cursor::new(Vec::new());
1✔
719
        let mut stream = Stream::new(cursor);
1✔
720

721
        futures_lite::future::block_on(async {
4✔
722
            let err = stream.expect_lazy("").await.unwrap_err();
2✔
723
            assert!(matches!(err, Error::Eof));
1✔
724
        });
725
    }
726

727
    #[test]
728
    fn test_expect_lazy_timeout() {
3✔
729
        futures_lite::future::block_on(async {
4✔
730
            let mut stream = Stream::new(NoEofReader::default());
2✔
731
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
2✔
732

733
            stream.write_all(b"Hello").await.unwrap();
2✔
734

735
            let err = stream.expect_lazy("Hello World").await.unwrap_err();
2✔
736
            assert!(matches!(err, Error::ExpectTimeout));
1✔
737

738
            stream.write_all(b" World").await.unwrap();
3✔
739
            let found = stream.expect_lazy("World").await.unwrap();
1✔
740
            assert_eq!(b"Hello ", found.before());
2✔
741
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
742
        });
743
    }
744

745
    #[test]
746
    fn test_expect_gready() {
3✔
747
        let buf = b"Hello World".to_vec();
1✔
748
        let cursor = futures_lite::io::Cursor::new(buf);
1✔
749
        let mut stream = Stream::new(cursor);
1✔
750

751
        futures_lite::future::block_on(async {
4✔
752
            let found = stream.expect_gready("World").await.unwrap();
2✔
753
            assert_eq!(b"Hello ", found.before());
2✔
754
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
755
        });
756
    }
757

758
    #[test]
759
    fn test_expect_gready_eof() {
3✔
760
        let buf = b"Hello World".to_vec();
1✔
761
        let cursor = futures_lite::io::Cursor::new(buf);
1✔
762
        let mut stream = Stream::new(cursor);
1✔
763

764
        futures_lite::future::block_on(async {
4✔
765
            let found = stream.expect_gready(Eof).await.unwrap();
2✔
766
            assert_eq!(b"", found.before());
2✔
767
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
1✔
768
        });
769

770
        let cursor = futures_lite::io::Cursor::new(Vec::new());
1✔
771
        let mut stream = Stream::new(cursor);
1✔
772

773
        futures_lite::future::block_on(async {
4✔
774
            let err = stream.expect_gready("").await.unwrap_err();
2✔
775
            assert!(matches!(err, Error::Eof));
1✔
776
        });
777
    }
778

779
    #[test]
780
    fn test_expect_gready_timeout() {
3✔
781
        futures_lite::future::block_on(async {
4✔
782
            let mut stream = Stream::new(NoEofReader::default());
2✔
783
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
2✔
784

785
            stream.write_all(b"Hello").await.unwrap();
2✔
786

787
            let err = stream.expect_gready("Hello World").await.unwrap_err();
2✔
788
            assert!(matches!(err, Error::ExpectTimeout));
1✔
789

790
            stream.write_all(b" World").await.unwrap();
3✔
791
            let found = stream.expect_gready("World").await.unwrap();
1✔
792
            assert_eq!(b"Hello ", found.before());
2✔
793
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
794
        });
795
    }
796

797
    #[test]
798
    fn test_check() {
3✔
799
        let buf = b"Hello World".to_vec();
1✔
800
        let cursor = futures_lite::io::Cursor::new(buf);
1✔
801
        let mut stream = Stream::new(cursor);
1✔
802

803
        futures_lite::future::block_on(async {
4✔
804
            let found = stream.check("World").await.unwrap();
2✔
805
            assert_eq!(b"Hello ", found.before());
2✔
806
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
807
        });
808
    }
809

810
    #[test]
811
    fn test_is_matched() {
3✔
812
        let mut stream = Stream::new(NoEofReader::default());
1✔
813
        futures_lite::future::block_on(async {
4✔
814
            stream.write_all(b"Hello World").await.unwrap();
3✔
815
            assert!(stream.is_matched("World").await.unwrap());
1✔
816
            assert!(!stream.is_matched("*****").await.unwrap());
1✔
817

818
            let found = stream.check("World").await.unwrap();
2✔
819
            assert_eq!(b"Hello ", found.before());
2✔
820
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
1✔
821
        });
822
    }
823

824
    #[derive(Debug, Default)]
825
    struct NoEofReader {
826
        data: Vec<u8>,
827
    }
828

829
    impl AsyncWrite for NoEofReader {
830
        fn poll_write(
1✔
831
            mut self: Pin<&mut Self>,
832
            _: &mut Context<'_>,
833
            buf: &[u8],
834
        ) -> Poll<io::Result<usize>> {
835
            self.data.extend(buf);
1✔
836
            Poll::Ready(Ok(buf.len()))
1✔
837
        }
838

839
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
840
            Poll::Ready(Ok(()))
×
841
        }
842

843
        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
844
            Poll::Ready(Ok(()))
×
845
        }
846
    }
847

848
    impl AsyncRead for NoEofReader {
849
        fn poll_read(
1✔
850
            mut self: Pin<&mut Self>,
851
            _: &mut Context<'_>,
852
            mut buf: &mut [u8],
853
        ) -> Poll<io::Result<usize>> {
854
            if self.data.is_empty() {
1✔
855
                return Poll::Pending;
1✔
856
            }
857

858
            let n = std::io::Write::write(&mut buf, &self.data)?;
2✔
859
            let _ = self.data.drain(..n);
1✔
860
            Poll::Ready(Ok(n))
1✔
861
        }
862
    }
863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc