• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 4369552515

pending completion
4369552515

Pull #61

github

GitHub
Merge ff4fae653 into 794d67830
Pull Request #61: Windows CI

1471 of 2612 relevant lines covered (56.32%)

3.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/session/async_session.rs
1
//! Module contains an async version of Session structure.
2

3
use std::{
4
    io::{self, IoSliceMut},
5
    ops::{Deref, DerefMut},
6
    pin::Pin,
7
    task::{Context, Poll},
8
    time::Duration,
9
};
10

11
use futures_lite::{
12
    ready, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
13
};
14

15
use crate::{process::Healthcheck, Captures, Error, Needle};
16

17
/// Session represents a spawned process and its streams.
18
/// It controlls process and communication with it.
19
#[derive(Debug)]
20
pub struct Session<P, S> {
21
    process: P,
22
    stream: Stream<S>,
23
}
24

25
// GEt back to the solution where Logger is just dyn Write instead of all these magic with type system.....
26

27
impl<P, S> Session<P, S> {
28
    pub(crate) fn new(process: P, stream: S) -> io::Result<Self> {
×
29
        Ok(Self {
×
30
            process,
×
31
            stream: Stream::new(stream),
×
32
        })
33
    }
34

35
    /// Get a reference to original stream.
36
    pub fn get_stream(&self) -> &S {
×
37
        self.stream.as_ref()
×
38
    }
39

40
    /// Get a mut reference to original stream.
41
    pub fn get_stream_mut(&mut self) -> &mut S {
×
42
        self.stream.as_mut()
×
43
    }
44

45
    /// Get a reference to a process running program.
46
    pub fn get_process(&self) -> &P {
×
47
        &self.process
×
48
    }
49

50
    /// Get a mut reference to a process running program.
51
    pub fn get_process_mut(&mut self) -> &mut P {
×
52
        &mut self.process
×
53
    }
54

55
    /// Set the pty session's expect timeout.
56
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
57
        self.stream.set_expect_timeout(expect_timeout);
×
58
    }
59

60
    /// Set a expect algorithm to be either gready or lazy.
61
    ///
62
    /// Default algorithm is gready.
63
    ///
64
    /// See [Session::expect].
65
    pub fn set_expect_lazy(&mut self, is_lazy: bool) {
×
66
        self.stream.expect_lazy = is_lazy;
×
67
    }
68

69
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R>(
×
70
        mut self,
71
        new_stream: F,
72
    ) -> Result<Session<P, R>, Error> {
73
        let buf = self.stream.get_available().to_owned();
×
74

75
        let stream = self.stream.into_inner();
×
76
        let stream = new_stream(stream);
×
77
        let mut session = Session::new(self.process, stream)?;
×
78
        session.stream.keep(&buf);
×
79
        Ok(session)
×
80
    }
81
}
82

83
impl<P: Healthcheck, S> Session<P, S> {
84
    /// Verifies whether process is still alive.
85
    pub fn is_alive(&mut self) -> Result<bool, Error> {
×
86
        self.process.is_alive().map_err(|err| err.into())
×
87
    }
88
}
89

90
impl<P, S: AsyncRead + Unpin> Session<P, S> {
91
    /// Expect waits until a pattern is matched.
92
    ///
93
    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
94
    ///
95
    /// The match algorthm can be either
96
    ///     - gready
97
    ///     - lazy
98
    ///
99
    /// You can set one via [Session::set_expect_lazy].
100
    /// Default version is gready.
101
    ///
102
    /// The implications are.
103
    ///
104
    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
105
    /// And your process outputs `123`.
106
    /// In case of lazy approach we will match `1`.
107
    /// Where's in case of gready one we will match `123`.
108
    ///
109
    /// # Example
110
    ///
111
    #[cfg_attr(windows, doc = "```no_run")]
112
    #[cfg_attr(unix, doc = "```")]
113
    /// # futures_lite::future::block_on(async {
114
    /// let mut p = expectrl::spawn("echo 123").unwrap();
115
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
116
    /// assert_eq!(m.get(0).unwrap(), b"123");
117
    /// # });
118
    /// ```
119
    ///
120
    #[cfg_attr(windows, doc = "```no_run")]
121
    #[cfg_attr(unix, doc = "```")]
122
    /// # futures_lite::future::block_on(async {
123
    /// let mut p = expectrl::spawn("echo 123").unwrap();
124
    /// p.set_expect_lazy(true);
125
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
126
    /// assert_eq!(m.get(0).unwrap(), b"1");
127
    /// # });
128
    /// ```
129
    ///
130
    /// This behaviour is different from [Session::check].
131
    ///
132
    /// It returns an error if timeout is reached.
133
    /// You can specify a timeout value by [Session::set_expect_timeout] method.
134
    pub async fn expect<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
135
        match self.stream.expect_lazy {
×
136
            true => self.stream.expect_lazy(needle).await,
×
137
            false => self.stream.expect_gready(needle).await,
×
138
        }
139
    }
140

141
    /// Check checks if a pattern is matched.
142
    /// Returns empty found structure if nothing found.
143
    ///
144
    /// Is a non blocking version of [Session::expect].
145
    /// But its strategy of matching is different from it.
146
    /// It makes search agains all bytes available.
147
    ///
148
    #[cfg_attr(windows, doc = "```no_run")]
149
    #[cfg_attr(unix, doc = "```")]
150
    /// # futures_lite::future::block_on(async {
151
    /// let mut p = expectrl::spawn("echo 123").unwrap();
152
    /// // wait to guarantee that check will successed (most likely)
153
    /// std::thread::sleep(std::time::Duration::from_secs(1));
154
    /// let m = p.check(expectrl::Regex("\\d+")).await.unwrap();
155
    /// assert_eq!(m.get(0).unwrap(), b"123");
156
    /// # });
157
    /// ```
158
    pub async fn check<E: Needle>(&mut self, needle: E) -> Result<Captures, Error> {
×
159
        self.stream.check(needle).await
×
160
    }
161

162
    /// Is matched checks if a pattern is matched.
163
    /// It doesn't consumes bytes from stream.
164
    pub async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
165
        self.stream.is_matched(needle).await
×
166
    }
167

168
    /// Verifyes if stream is empty or not.
169
    pub async fn is_empty(&mut self) -> io::Result<bool> {
×
170
        self.stream.is_empty().await
×
171
    }
172
}
173

174
impl<Proc, S: AsyncWrite + Unpin> Session<Proc, S> {
175
    /// Send text to child’s STDIN.
176
    ///
177
    /// You can also use methods from [std::io::Write] instead.
178
    ///
179
    /// # Example
180
    ///
181
    /// ```
182
    /// use expectrl::{spawn, ControlCode};
183
    ///
184
    /// let mut proc = spawn("cat").unwrap();
185
    ///
186
    /// # futures_lite::future::block_on(async {
187
    /// proc.send("Hello");
188
    /// proc.send(b"World");
189
    /// proc.send(ControlCode::try_from("^C").unwrap());
190
    /// # });
191
    /// ```
192
    pub async fn send<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
×
193
        self.stream.write_all(buf.as_ref()).await
×
194
    }
195

196
    /// Send a line to child’s STDIN.
197
    ///
198
    /// # Example
199
    ///
200
    /// ```
201
    /// use expectrl::{spawn, ControlCode};
202
    ///
203
    /// let mut proc = spawn("cat").unwrap();
204
    ///
205
    /// # futures_lite::future::block_on(async {
206
    /// proc.send_line("Hello");
207
    /// proc.send_line(b"World");
208
    /// proc.send_line(ControlCode::try_from("^C").unwrap());
209
    /// # });
210
    /// ```
211
    pub async fn send_line<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
×
212
        #[cfg(windows)]
213
        const LINE_ENDING: &[u8] = b"\r\n";
214
        #[cfg(not(windows))]
215
        const LINE_ENDING: &[u8] = b"\n";
216

217
        self.stream.write_all(buf.as_ref()).await?;
×
218
        self.stream.write_all(LINE_ENDING).await?;
×
219

220
        Ok(())
×
221
    }
222
}
223

224
impl<P, S> Deref for Session<P, S> {
225
    type Target = P;
226

227
    fn deref(&self) -> &Self::Target {
×
228
        &self.process
×
229
    }
230
}
231

232
impl<P, S> DerefMut for Session<P, S> {
233
    fn deref_mut(&mut self) -> &mut Self::Target {
×
234
        &mut self.process
×
235
    }
236
}
237

238
impl<P: Unpin, S: AsyncWrite + Unpin> AsyncWrite for Session<P, S> {
239
    fn poll_write(
×
240
        self: Pin<&mut Self>,
241
        cx: &mut Context<'_>,
242
        buf: &[u8],
243
    ) -> Poll<io::Result<usize>> {
244
        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
×
245
    }
246

247
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
248
        Pin::new(&mut self.stream).poll_flush(cx)
×
249
    }
250

251
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
252
        Pin::new(&mut self.stream).poll_close(cx)
×
253
    }
254

255
    fn poll_write_vectored(
×
256
        mut self: Pin<&mut Self>,
257
        cx: &mut Context<'_>,
258
        bufs: &[io::IoSlice<'_>],
259
    ) -> Poll<io::Result<usize>> {
260
        Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
×
261
    }
262
}
263

264
impl<P: Unpin, S: AsyncRead + Unpin> AsyncRead for Session<P, S> {
265
    fn poll_read(
×
266
        mut self: Pin<&mut Self>,
267
        cx: &mut Context<'_>,
268
        buf: &mut [u8],
269
    ) -> Poll<io::Result<usize>> {
270
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
271
    }
272
}
273

274
impl<P: Unpin, S: AsyncRead + Unpin> AsyncBufRead for Session<P, S> {
275
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
276
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
277
    }
278

279
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
280
        Pin::new(&mut self.stream).consume(amt);
×
281
    }
282
}
283

284
/// Session represents a spawned process and its streams.
285
/// It controlls process and communication with it.
286
#[derive(Debug)]
287
struct Stream<S> {
288
    stream: BufferedStream<S>,
289
    expect_timeout: Option<Duration>,
290
    expect_lazy: bool,
291
}
292

293
impl<S> Stream<S> {
294
    /// Creates an async IO stream.
295
    fn new(stream: S) -> Self {
×
296
        Self {
297
            stream: BufferedStream::new(stream),
×
298
            expect_timeout: Some(Duration::from_millis(10000)),
×
299
            expect_lazy: false,
300
        }
301
    }
302

303
    /// Returns a reference to original stream.
304
    fn as_ref(&self) -> &S {
×
305
        &self.stream.stream
×
306
    }
307

308
    /// Returns a mut reference to original stream.
309
    fn as_mut(&mut self) -> &mut S {
×
310
        &mut self.stream.stream
×
311
    }
312

313
    /// Set the pty session's expect timeout.
314
    fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
315
        self.expect_timeout = expect_timeout;
×
316
    }
317

318
    /// Save a bytes in inner buffer.
319
    /// They'll be pushed to the end of the buffer.
320
    fn keep(&mut self, buf: &[u8]) {
×
321
        self.stream.keep(buf);
×
322
    }
323

324
    /// Get an inner buffer.
325
    fn get_available(&mut self) -> &[u8] {
×
326
        self.stream.buffer()
×
327
    }
328

329
    /// Returns an inner IO stream.
330
    fn into_inner(self) -> S {
×
331
        self.stream.stream
×
332
    }
333
}
334

335
impl<S: AsyncRead + Unpin> Stream<S> {
336
    async fn expect_gready<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
337
        let expect_timeout = self.expect_timeout;
×
338

339
        let expect_future = async {
×
340
            let mut eof = false;
×
341
            loop {
×
342
                let data = self.stream.buffer();
×
343

344
                let found = Needle::check(&needle, data, eof)?;
×
345

346
                if !found.is_empty() {
×
347
                    let end_index = Captures::right_most_index(&found);
×
348
                    let involved_bytes = data[..end_index].to_vec();
×
349
                    self.stream.consume(end_index);
×
350

351
                    return Ok(Captures::new(involved_bytes, found));
×
352
                }
353

354
                if eof {
×
355
                    return Err(Error::Eof);
×
356
                }
357

358
                eof = self.stream.fill().await? == 0;
×
359
            }
360
        };
361

362
        if let Some(timeout) = expect_timeout {
×
363
            let timeout_future = futures_timer::Delay::new(timeout);
×
364
            futures_lite::future::or(expect_future, async {
×
365
                timeout_future.await;
×
366
                Err(Error::ExpectTimeout)
×
367
            })
368
            .await
×
369
        } else {
370
            expect_future.await
×
371
        }
372
    }
373

374
    async fn expect_lazy<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
375
        let expect_timeout = self.expect_timeout;
×
376
        let expect_future = async {
×
377
            // We read by byte to make things as lazy as possible.
378
            //
379
            // It's chose is important in using Regex as a Needle.
380
            // Imagine we have a `\d+` regex.
381
            // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
382
            //
383
            // The second reason is
384
            // if we wouldn't read by byte EOF indication could be lost.
385
            // And next blocking std::io::Read operation could be blocked forever.
386
            //
387
            // We could read all data available via `read_available` to reduce IO operations,
388
            // but in such case we would need to keep a EOF indicator internally in stream,
389
            // which is OK if EOF happens onces, but I am not sure if this is a case.
390

391
            let mut checked_length = 0;
×
392
            let mut eof = false;
×
393
            loop {
×
394
                let available = self.stream.buffer();
×
395
                let is_buffer_checked = checked_length == available.len();
×
396
                if is_buffer_checked {
×
397
                    let n = self.stream.fill().await?;
×
398
                    eof = n == 0;
×
399
                }
400

401
                // We intentinally not increase the counter
402
                // and run check one more time even though the data isn't changed.
403
                // Because it may be important for custom implementations of Needle.
404
                let available = self.stream.buffer();
×
405
                if checked_length < available.len() {
×
406
                    checked_length += 1;
×
407
                }
408

409
                let data = &available[..checked_length];
×
410
                let found = Needle::check(&needle, data, eof)?;
×
411
                if !found.is_empty() {
×
412
                    let end_index = Captures::right_most_index(&found);
×
413
                    let involved_bytes = data[..end_index].to_vec();
×
414
                    self.stream.consume(end_index);
×
415
                    return Ok(Captures::new(involved_bytes, found));
×
416
                }
417

418
                if eof {
×
419
                    return Err(Error::Eof);
×
420
                }
421
            }
422
        };
423

424
        if let Some(timeout) = expect_timeout {
×
425
            let timeout_future = futures_timer::Delay::new(timeout);
×
426
            futures_lite::future::or(expect_future, async {
×
427
                timeout_future.await;
×
428
                Err(Error::ExpectTimeout)
×
429
            })
430
            .await
×
431
        } else {
432
            expect_future.await
×
433
        }
434
    }
435

436
    /// Is matched checks if a pattern is matched.
437
    /// It doesn't consumes bytes from stream.
438
    async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
439
        let eof = self.try_fill().await?;
×
440
        let buf = self.stream.buffer();
×
441

442
        let found = needle.check(buf, eof)?;
×
443
        if !found.is_empty() {
×
444
            return Ok(true);
×
445
        }
446

447
        if eof {
×
448
            return Err(Error::Eof);
×
449
        }
450

451
        Ok(false)
452
    }
453

454
    /// Check checks if a pattern is matched.
455
    /// Returns empty found structure if nothing found.
456
    ///
457
    /// Is a non blocking version of [Session::expect].
458
    /// But its strategy of matching is different from it.
459
    /// It makes search agains all bytes available.
460
    ///
461
    #[cfg_attr(windows, doc = "```no_run")]
462
    #[cfg_attr(unix, doc = "```")]
463
    /// # futures_lite::future::block_on(async {
464
    /// #
465
    /// let mut p = expectrl::spawn("echo 123").unwrap();
466
    /// // wait to guarantee that check will successed (most likely)
467
    /// std::thread::sleep(std::time::Duration::from_secs(1));
468
    /// let m = p.check(expectrl::Regex("\\d+")).await.unwrap();
469
    /// assert_eq!(m.get(0).unwrap(), b"123");
470
    /// #
471
    /// # });
472
    /// ```
473
    async fn check<E: Needle>(&mut self, needle: E) -> Result<Captures, Error> {
×
474
        let eof = self.try_fill().await?;
×
475

476
        let buf = self.stream.buffer();
×
477
        let found = needle.check(buf, eof)?;
×
478
        if !found.is_empty() {
×
479
            let end_index = Captures::right_most_index(&found);
×
480
            let involved_bytes = buf[..end_index].to_vec();
×
481
            self.stream.consume(end_index);
×
482
            return Ok(Captures::new(involved_bytes, found));
×
483
        }
484

485
        if eof {
×
486
            return Err(Error::Eof);
×
487
        }
488

489
        Ok(Captures::new(Vec::new(), Vec::new()))
×
490
    }
491

492
    /// Verifyes if stream is empty or not.
493
    async fn is_empty(&mut self) -> io::Result<bool> {
×
494
        match futures_lite::future::poll_once(self.read(&mut [])).await {
×
495
            Some(Ok(0)) => Ok(true),
496
            Some(Ok(_)) => Ok(false),
497
            Some(Err(err)) => Err(err),
×
498
            None => Ok(true),
499
        }
500
    }
501

502
    async fn try_fill(&mut self) -> Result<bool, Error> {
×
503
        match futures_lite::future::poll_once(self.stream.fill()).await {
×
504
            Some(Ok(n)) => Ok(n == 0),
×
505
            Some(Err(err)) => Err(err.into()),
×
506
            None => Ok(false),
507
        }
508
    }
509
}
510

511
impl<S: AsyncWrite + Unpin> AsyncWrite for Stream<S> {
512
    fn poll_write(
×
513
        mut self: Pin<&mut Self>,
514
        cx: &mut Context<'_>,
515
        buf: &[u8],
516
    ) -> Poll<io::Result<usize>> {
517
        Pin::new(&mut *self.stream.get_mut()).poll_write(cx, buf)
×
518
    }
519

520
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
521
        Pin::new(&mut *self.stream.get_mut()).poll_flush(cx)
×
522
    }
523

524
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
525
        Pin::new(&mut *self.stream.get_mut()).poll_close(cx)
×
526
    }
527

528
    fn poll_write_vectored(
×
529
        mut self: Pin<&mut Self>,
530
        cx: &mut Context<'_>,
531
        bufs: &[io::IoSlice<'_>],
532
    ) -> Poll<io::Result<usize>> {
533
        Pin::new(&mut *self.stream.get_mut()).poll_write_vectored(cx, bufs)
×
534
    }
535
}
536

537
impl<S: AsyncRead + Unpin> AsyncRead for Stream<S> {
538
    fn poll_read(
×
539
        mut self: Pin<&mut Self>,
540
        cx: &mut Context<'_>,
541
        buf: &mut [u8],
542
    ) -> Poll<io::Result<usize>> {
543
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
544
    }
545
}
546

547
impl<S: AsyncRead + Unpin> AsyncBufRead for Stream<S> {
548
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
549
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
550
    }
551

552
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
553
        Pin::new(&mut self.stream).consume(amt);
×
554
    }
555
}
556

557
/// Session represents a spawned process and its streams.
558
/// It controlls process and communication with it.
559
#[derive(Debug)]
560
struct BufferedStream<S> {
561
    stream: S,
562
    buffer: Vec<u8>,
563
    length: usize,
564
}
565

566
impl<S> BufferedStream<S> {
567
    fn new(stream: S) -> Self {
×
568
        Self {
569
            stream,
570
            buffer: Vec::new(),
×
571
            length: 0,
572
        }
573
    }
574

575
    fn keep(&mut self, buf: &[u8]) {
×
576
        self.buffer.extend(buf);
×
577
        self.length += buf.len();
×
578
    }
579

580
    fn buffer(&self) -> &[u8] {
×
581
        &self.buffer[..self.length]
×
582
    }
583

584
    fn get_mut(&mut self) -> &mut S {
×
585
        &mut self.stream
×
586
    }
587
}
588

589
impl<S: AsyncRead + Unpin> BufferedStream<S> {
590
    async fn fill(&mut self) -> io::Result<usize> {
×
591
        let mut buf = [0; 128];
×
592
        let n = self.stream.read(&mut buf).await?;
×
593
        self.keep(&buf[..n]);
×
594
        Ok(n)
×
595
    }
596
}
597

598
impl<S: AsyncRead + Unpin> AsyncRead for BufferedStream<S> {
599
    fn poll_read(
×
600
        mut self: Pin<&mut Self>,
601
        cx: &mut Context<'_>,
602
        buf: &mut [u8],
603
    ) -> Poll<io::Result<usize>> {
604
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
605
        let nread = std::io::Read::read(&mut rem, buf)?;
×
606
        self.consume(nread);
×
607
        Poll::Ready(Ok(nread))
×
608
    }
609

610
    fn poll_read_vectored(
×
611
        mut self: Pin<&mut Self>,
612
        cx: &mut Context<'_>,
613
        bufs: &mut [IoSliceMut<'_>],
614
    ) -> Poll<io::Result<usize>> {
615
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
616
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
×
617
        self.consume(nread);
×
618
        Poll::Ready(Ok(nread))
×
619
    }
620
}
621

622
impl<S: AsyncRead + Unpin> AsyncBufRead for BufferedStream<S> {
623
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
624
        if self.buffer.is_empty() {
×
625
            let mut buf = [0; 128];
×
626
            let n = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
×
627
            self.keep(&buf[..n]);
×
628
        }
629

630
        let buf = self.get_mut().buffer();
×
631
        Poll::Ready(Ok(buf))
×
632
    }
633

634
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
635
        let _ = self.buffer.drain(..amt);
×
636
        self.length -= amt;
×
637
    }
638
}
639

640
#[cfg(test)]
641
mod tests {
642
    use futures_lite::AsyncWriteExt;
643

644
    use crate::Eof;
645

646
    use super::*;
647

648
    #[test]
649
    fn test_expect_lazy() {
650
        let buf = b"Hello World".to_vec();
651
        let cursor = futures_lite::io::Cursor::new(buf);
652
        let mut stream = Stream::new(cursor);
653

654
        futures_lite::future::block_on(async {
655
            let found = stream.expect_lazy("World").await.unwrap();
656
            assert_eq!(b"Hello ", found.before());
657
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
658
        });
659
    }
660

661
    #[test]
662
    fn test_expect_lazy_eof() {
663
        let buf = b"Hello World".to_vec();
664
        let cursor = futures_lite::io::Cursor::new(buf);
665
        let mut stream = Stream::new(cursor);
666

667
        futures_lite::future::block_on(async {
668
            let found = stream.expect_lazy(Eof).await.unwrap();
669
            assert_eq!(b"", found.before());
670
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
671
        });
672

673
        let cursor = futures_lite::io::Cursor::new(Vec::new());
674
        let mut stream = Stream::new(cursor);
675

676
        futures_lite::future::block_on(async {
677
            let err = stream.expect_lazy("").await.unwrap_err();
678
            assert!(matches!(err, Error::Eof));
679
        });
680
    }
681

682
    #[test]
683
    fn test_expect_lazy_timeout() {
684
        futures_lite::future::block_on(async {
685
            let mut stream = Stream::new(NoEofReader::default());
686
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
687

688
            stream.write_all(b"Hello").await.unwrap();
689

690
            let err = stream.expect_lazy("Hello World").await.unwrap_err();
691
            assert!(matches!(err, Error::ExpectTimeout));
692

693
            stream.write_all(b" World").await.unwrap();
694
            let found = stream.expect_lazy("World").await.unwrap();
695
            assert_eq!(b"Hello ", found.before());
696
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
697
        });
698
    }
699

700
    #[test]
701
    fn test_expect_gready() {
702
        let buf = b"Hello World".to_vec();
703
        let cursor = futures_lite::io::Cursor::new(buf);
704
        let mut stream = Stream::new(cursor);
705

706
        futures_lite::future::block_on(async {
707
            let found = stream.expect_gready("World").await.unwrap();
708
            assert_eq!(b"Hello ", found.before());
709
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
710
        });
711
    }
712

713
    #[test]
714
    fn test_expect_gready_eof() {
715
        let buf = b"Hello World".to_vec();
716
        let cursor = futures_lite::io::Cursor::new(buf);
717
        let mut stream = Stream::new(cursor);
718

719
        futures_lite::future::block_on(async {
720
            let found = stream.expect_gready(Eof).await.unwrap();
721
            assert_eq!(b"", found.before());
722
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
723
        });
724

725
        let cursor = futures_lite::io::Cursor::new(Vec::new());
726
        let mut stream = Stream::new(cursor);
727

728
        futures_lite::future::block_on(async {
729
            let err = stream.expect_gready("").await.unwrap_err();
730
            assert!(matches!(err, Error::Eof));
731
        });
732
    }
733

734
    #[test]
735
    fn test_expect_gready_timeout() {
736
        futures_lite::future::block_on(async {
737
            let mut stream = Stream::new(NoEofReader::default());
738
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
739

740
            stream.write_all(b"Hello").await.unwrap();
741

742
            let err = stream.expect_gready("Hello World").await.unwrap_err();
743
            assert!(matches!(err, Error::ExpectTimeout));
744

745
            stream.write_all(b" World").await.unwrap();
746
            let found = stream.expect_gready("World").await.unwrap();
747
            assert_eq!(b"Hello ", found.before());
748
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
749
        });
750
    }
751

752
    #[test]
753
    fn test_check() {
754
        let buf = b"Hello World".to_vec();
755
        let cursor = futures_lite::io::Cursor::new(buf);
756
        let mut stream = Stream::new(cursor);
757

758
        futures_lite::future::block_on(async {
759
            let found = stream.check("World").await.unwrap();
760
            assert_eq!(b"Hello ", found.before());
761
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
762
        });
763
    }
764

765
    #[test]
766
    fn test_is_matched() {
767
        let mut stream = Stream::new(NoEofReader::default());
768
        futures_lite::future::block_on(async {
769
            stream.write_all(b"Hello World").await.unwrap();
770
            assert!(stream.is_matched("World").await.unwrap());
771
            assert!(!stream.is_matched("*****").await.unwrap());
772

773
            let found = stream.check("World").await.unwrap();
774
            assert_eq!(b"Hello ", found.before());
775
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
776
        });
777
    }
778

779
    #[derive(Debug, Default)]
780
    struct NoEofReader {
781
        data: Vec<u8>,
782
    }
783

784
    impl AsyncWrite for NoEofReader {
785
        fn poll_write(
×
786
            mut self: Pin<&mut Self>,
787
            _: &mut Context<'_>,
788
            buf: &[u8],
789
        ) -> Poll<io::Result<usize>> {
790
            self.data.extend(buf);
×
791
            Poll::Ready(Ok(buf.len()))
×
792
        }
793

794
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
795
            Poll::Ready(Ok(()))
×
796
        }
797

798
        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
799
            Poll::Ready(Ok(()))
×
800
        }
801
    }
802

803
    impl AsyncRead for NoEofReader {
804
        fn poll_read(
×
805
            mut self: Pin<&mut Self>,
806
            _: &mut Context<'_>,
807
            mut buf: &mut [u8],
808
        ) -> Poll<io::Result<usize>> {
809
            if self.data.is_empty() {
×
810
                return Poll::Pending;
×
811
            }
812

813
            let n = std::io::Write::write(&mut buf, &self.data)?;
×
814
            let _ = self.data.drain(..n);
×
815
            Poll::Ready(Ok(n))
×
816
        }
817
    }
818
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc