• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 4552806474

pending completion
4552806474

push

github

Maxim Zhiburt
Refactorings

27 of 27 new or added lines in 9 files covered. (100.0%)

1477 of 2615 relevant lines covered (56.48%)

3.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/session/async_session.rs
1
//! Module contains an async version of Session structure.
2

3
use std::{
4
    io::{self, IoSliceMut},
5
    ops::{Deref, DerefMut},
6
    pin::Pin,
7
    task::{Context, Poll},
8
    time::Duration,
9
};
10

11
use futures_lite::{
12
    ready, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
13
};
14

15
use crate::{process::Healthcheck, Captures, Error, Needle};
16

17
/// Session represents a spawned process and its streams.
18
/// It controlls process and communication with it.
19
#[derive(Debug)]
20
pub struct Session<P = super::OsProcess, S = super::OsProcessStream> {
21
    process: P,
22
    stream: Stream<S>,
23
}
24

25
// GEt back to the solution where Logger is just dyn Write instead of all these magic with type system.....
26

27
impl<P, S> Session<P, S> {
28
    /// Create a new session.
29
    pub fn new(process: P, stream: S) -> io::Result<Self> {
×
30
        Ok(Self {
×
31
            process,
×
32
            stream: Stream::new(stream),
×
33
        })
34
    }
35

36
    /// Get a reference to original stream.
37
    pub fn get_stream(&self) -> &S {
×
38
        self.stream.as_ref()
×
39
    }
40

41
    /// Get a mut reference to original stream.
42
    pub fn get_stream_mut(&mut self) -> &mut S {
×
43
        self.stream.as_mut()
×
44
    }
45

46
    /// Get a reference to a process running program.
47
    pub fn get_process(&self) -> &P {
×
48
        &self.process
×
49
    }
50

51
    /// Get a mut reference to a process running program.
52
    pub fn get_process_mut(&mut self) -> &mut P {
×
53
        &mut self.process
×
54
    }
55

56
    /// Set the pty session's expect timeout.
57
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
58
        self.stream.set_expect_timeout(expect_timeout);
×
59
    }
60

61
    /// Set a expect algorithm to be either gready or lazy.
62
    ///
63
    /// Default algorithm is gready.
64
    ///
65
    /// See [Session::expect].
66
    pub fn set_expect_lazy(&mut self, is_lazy: bool) {
×
67
        self.stream.expect_lazy = is_lazy;
×
68
    }
69

70
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R>(
×
71
        mut self,
72
        new_stream: F,
73
    ) -> Result<Session<P, R>, Error> {
74
        let buf = self.stream.get_available().to_owned();
×
75

76
        let stream = self.stream.into_inner();
×
77
        let stream = new_stream(stream);
×
78
        let mut session = Session::new(self.process, stream)?;
×
79
        session.stream.keep(&buf);
×
80
        Ok(session)
×
81
    }
82
}
83

84
impl<P: Healthcheck, S> Session<P, S> {
85
    /// Verifies whether process is still alive.
86
    pub fn is_alive(&mut self) -> Result<bool, Error> {
×
87
        self.process.is_alive().map_err(|err| err.into())
×
88
    }
89
}
90

91
impl<P, S: AsyncRead + Unpin> Session<P, S> {
92
    /// Expect waits until a pattern is matched.
93
    ///
94
    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
95
    ///
96
    /// The match algorthm can be either
97
    ///     - gready
98
    ///     - lazy
99
    ///
100
    /// You can set one via [Session::set_expect_lazy].
101
    /// Default version is gready.
102
    ///
103
    /// The implications are.
104
    ///
105
    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
106
    /// And your process outputs `123`.
107
    /// In case of lazy approach we will match `1`.
108
    /// Where's in case of gready one we will match `123`.
109
    ///
110
    /// # Example
111
    ///
112
    #[cfg_attr(windows, doc = "```no_run")]
113
    #[cfg_attr(unix, doc = "```")]
114
    /// # futures_lite::future::block_on(async {
115
    /// let mut p = expectrl::spawn("echo 123").unwrap();
116
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
117
    /// assert_eq!(m.get(0).unwrap(), b"123");
118
    /// # });
119
    /// ```
120
    ///
121
    #[cfg_attr(windows, doc = "```no_run")]
122
    #[cfg_attr(unix, doc = "```")]
123
    /// # futures_lite::future::block_on(async {
124
    /// let mut p = expectrl::spawn("echo 123").unwrap();
125
    /// p.set_expect_lazy(true);
126
    /// let m = p.expect(expectrl::Regex("\\d+")).await.unwrap();
127
    /// assert_eq!(m.get(0).unwrap(), b"1");
128
    /// # });
129
    /// ```
130
    ///
131
    /// This behaviour is different from [Session::check].
132
    ///
133
    /// It returns an error if timeout is reached.
134
    /// You can specify a timeout value by [Session::set_expect_timeout] method.
135
    pub async fn expect<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
136
        match self.stream.expect_lazy {
×
137
            true => self.stream.expect_lazy(needle).await,
×
138
            false => self.stream.expect_gready(needle).await,
×
139
        }
140
    }
141

142
    /// Check checks if a pattern is matched.
143
    /// Returns empty found structure if nothing found.
144
    ///
145
    /// Is a non blocking version of [Session::expect].
146
    /// But its strategy of matching is different from it.
147
    /// It makes search agains all bytes available.
148
    ///
149
    #[cfg_attr(windows, doc = "```no_run")]
150
    #[cfg_attr(unix, doc = "```")]
151
    /// # futures_lite::future::block_on(async {
152
    /// let mut p = expectrl::spawn("echo 123").unwrap();
153
    /// // wait to guarantee that check will successed (most likely)
154
    /// std::thread::sleep(std::time::Duration::from_secs(1));
155
    /// let m = p.check(expectrl::Regex("\\d+")).await.unwrap();
156
    /// assert_eq!(m.get(0).unwrap(), b"123");
157
    /// # });
158
    /// ```
159
    pub async fn check<E: Needle>(&mut self, needle: E) -> Result<Captures, Error> {
×
160
        self.stream.check(needle).await
×
161
    }
162

163
    /// Is matched checks if a pattern is matched.
164
    /// It doesn't consumes bytes from stream.
165
    pub async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
166
        self.stream.is_matched(needle).await
×
167
    }
168

169
    /// Verifyes if stream is empty or not.
170
    pub async fn is_empty(&mut self) -> io::Result<bool> {
×
171
        self.stream.is_empty().await
×
172
    }
173
}
174

175
impl<Proc, S: AsyncWrite + Unpin> Session<Proc, S> {
176
    /// Send text to child’s STDIN.
177
    ///
178
    /// You can also use methods from [std::io::Write] instead.
179
    ///
180
    /// # Example
181
    ///
182
    /// ```
183
    /// use expectrl::{spawn, ControlCode};
184
    ///
185
    /// let mut proc = spawn("cat").unwrap();
186
    ///
187
    /// # futures_lite::future::block_on(async {
188
    /// proc.send("Hello");
189
    /// proc.send(b"World");
190
    /// proc.send(ControlCode::try_from("^C").unwrap());
191
    /// # });
192
    /// ```
193
    pub async fn send<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
×
194
        self.stream.write_all(buf.as_ref()).await
×
195
    }
196

197
    /// Send a line to child’s STDIN.
198
    ///
199
    /// # Example
200
    ///
201
    /// ```
202
    /// use expectrl::{spawn, ControlCode};
203
    ///
204
    /// let mut proc = spawn("cat").unwrap();
205
    ///
206
    /// # futures_lite::future::block_on(async {
207
    /// proc.send_line("Hello");
208
    /// proc.send_line(b"World");
209
    /// proc.send_line(ControlCode::try_from("^C").unwrap());
210
    /// # });
211
    /// ```
212
    pub async fn send_line<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
×
213
        #[cfg(windows)]
214
        const LINE_ENDING: &[u8] = b"\r\n";
215
        #[cfg(not(windows))]
216
        const LINE_ENDING: &[u8] = b"\n";
217

218
        self.stream.write_all(buf.as_ref()).await?;
×
219
        self.stream.write_all(LINE_ENDING).await?;
×
220

221
        Ok(())
×
222
    }
223
}
224

225
impl<P, S> Deref for Session<P, S> {
226
    type Target = P;
227

228
    fn deref(&self) -> &Self::Target {
×
229
        &self.process
×
230
    }
231
}
232

233
impl<P, S> DerefMut for Session<P, S> {
234
    fn deref_mut(&mut self) -> &mut Self::Target {
×
235
        &mut self.process
×
236
    }
237
}
238

239
impl<P: Unpin, S: AsyncWrite + Unpin> AsyncWrite for Session<P, S> {
240
    fn poll_write(
×
241
        self: Pin<&mut Self>,
242
        cx: &mut Context<'_>,
243
        buf: &[u8],
244
    ) -> Poll<io::Result<usize>> {
245
        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
×
246
    }
247

248
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
249
        Pin::new(&mut self.stream).poll_flush(cx)
×
250
    }
251

252
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
253
        Pin::new(&mut self.stream).poll_close(cx)
×
254
    }
255

256
    fn poll_write_vectored(
×
257
        mut self: Pin<&mut Self>,
258
        cx: &mut Context<'_>,
259
        bufs: &[io::IoSlice<'_>],
260
    ) -> Poll<io::Result<usize>> {
261
        Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
×
262
    }
263
}
264

265
impl<P: Unpin, S: AsyncRead + Unpin> AsyncRead for Session<P, S> {
266
    fn poll_read(
×
267
        mut self: Pin<&mut Self>,
268
        cx: &mut Context<'_>,
269
        buf: &mut [u8],
270
    ) -> Poll<io::Result<usize>> {
271
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
272
    }
273
}
274

275
impl<P: Unpin, S: AsyncRead + Unpin> AsyncBufRead for Session<P, S> {
276
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
277
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
278
    }
279

280
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
281
        Pin::new(&mut self.stream).consume(amt);
×
282
    }
283
}
284

285
/// Session represents a spawned process and its streams.
286
/// It controlls process and communication with it.
287
#[derive(Debug)]
288
struct Stream<S> {
289
    stream: BufferedStream<S>,
290
    expect_timeout: Option<Duration>,
291
    expect_lazy: bool,
292
}
293

294
impl<S> Stream<S> {
295
    /// Creates an async IO stream.
296
    fn new(stream: S) -> Self {
×
297
        Self {
298
            stream: BufferedStream::new(stream),
×
299
            expect_timeout: Some(Duration::from_millis(10000)),
×
300
            expect_lazy: false,
301
        }
302
    }
303

304
    /// Returns a reference to original stream.
305
    fn as_ref(&self) -> &S {
×
306
        &self.stream.stream
×
307
    }
308

309
    /// Returns a mut reference to original stream.
310
    fn as_mut(&mut self) -> &mut S {
×
311
        &mut self.stream.stream
×
312
    }
313

314
    /// Set the pty session's expect timeout.
315
    fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
×
316
        self.expect_timeout = expect_timeout;
×
317
    }
318

319
    /// Save a bytes in inner buffer.
320
    /// They'll be pushed to the end of the buffer.
321
    fn keep(&mut self, buf: &[u8]) {
×
322
        self.stream.keep(buf);
×
323
    }
324

325
    /// Get an inner buffer.
326
    fn get_available(&mut self) -> &[u8] {
×
327
        self.stream.buffer()
×
328
    }
329

330
    /// Returns an inner IO stream.
331
    fn into_inner(self) -> S {
×
332
        self.stream.stream
×
333
    }
334
}
335

336
impl<S: AsyncRead + Unpin> Stream<S> {
337
    async fn expect_gready<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
338
        let expect_timeout = self.expect_timeout;
×
339

340
        let expect_future = async {
×
341
            let mut eof = false;
×
342
            loop {
×
343
                let data = self.stream.buffer();
×
344

345
                let found = Needle::check(&needle, data, eof)?;
×
346

347
                if !found.is_empty() {
×
348
                    let end_index = Captures::right_most_index(&found);
×
349
                    let involved_bytes = data[..end_index].to_vec();
×
350
                    self.stream.consume(end_index);
×
351

352
                    return Ok(Captures::new(involved_bytes, found));
×
353
                }
354

355
                if eof {
×
356
                    return Err(Error::Eof);
×
357
                }
358

359
                eof = self.stream.fill().await? == 0;
×
360
            }
361
        };
362

363
        if let Some(timeout) = expect_timeout {
×
364
            let timeout_future = futures_timer::Delay::new(timeout);
×
365
            futures_lite::future::or(expect_future, async {
×
366
                timeout_future.await;
×
367
                Err(Error::ExpectTimeout)
×
368
            })
369
            .await
×
370
        } else {
371
            expect_future.await
×
372
        }
373
    }
374

375
    async fn expect_lazy<N: Needle>(&mut self, needle: N) -> Result<Captures, Error> {
×
376
        let expect_timeout = self.expect_timeout;
×
377
        let expect_future = async {
×
378
            // We read by byte to make things as lazy as possible.
379
            //
380
            // It's chose is important in using Regex as a Needle.
381
            // Imagine we have a `\d+` regex.
382
            // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
383
            //
384
            // The second reason is
385
            // if we wouldn't read by byte EOF indication could be lost.
386
            // And next blocking std::io::Read operation could be blocked forever.
387
            //
388
            // We could read all data available via `read_available` to reduce IO operations,
389
            // but in such case we would need to keep a EOF indicator internally in stream,
390
            // which is OK if EOF happens onces, but I am not sure if this is a case.
391

392
            let mut checked_length = 0;
×
393
            let mut eof = false;
×
394
            loop {
×
395
                let available = self.stream.buffer();
×
396
                let is_buffer_checked = checked_length == available.len();
×
397
                if is_buffer_checked {
×
398
                    let n = self.stream.fill().await?;
×
399
                    eof = n == 0;
×
400
                }
401

402
                // We intentinally not increase the counter
403
                // and run check one more time even though the data isn't changed.
404
                // Because it may be important for custom implementations of Needle.
405
                let available = self.stream.buffer();
×
406
                if checked_length < available.len() {
×
407
                    checked_length += 1;
×
408
                }
409

410
                let data = &available[..checked_length];
×
411
                let found = Needle::check(&needle, data, eof)?;
×
412
                if !found.is_empty() {
×
413
                    let end_index = Captures::right_most_index(&found);
×
414
                    let involved_bytes = data[..end_index].to_vec();
×
415
                    self.stream.consume(end_index);
×
416
                    return Ok(Captures::new(involved_bytes, found));
×
417
                }
418

419
                if eof {
×
420
                    return Err(Error::Eof);
×
421
                }
422
            }
423
        };
424

425
        if let Some(timeout) = expect_timeout {
×
426
            let timeout_future = futures_timer::Delay::new(timeout);
×
427
            futures_lite::future::or(expect_future, async {
×
428
                timeout_future.await;
×
429
                Err(Error::ExpectTimeout)
×
430
            })
431
            .await
×
432
        } else {
433
            expect_future.await
×
434
        }
435
    }
436

437
    /// Is matched checks if a pattern is matched.
438
    /// It doesn't consumes bytes from stream.
439
    async fn is_matched<E: Needle>(&mut self, needle: E) -> Result<bool, Error> {
×
440
        let eof = self.try_fill().await?;
×
441
        let buf = self.stream.buffer();
×
442

443
        let found = needle.check(buf, eof)?;
×
444
        if !found.is_empty() {
×
445
            return Ok(true);
×
446
        }
447

448
        if eof {
×
449
            return Err(Error::Eof);
×
450
        }
451

452
        Ok(false)
453
    }
454

455
    /// Check checks if a pattern is matched.
456
    /// Returns empty found structure if nothing found.
457
    ///
458
    /// Is a non blocking version of [Session::expect].
459
    /// But its strategy of matching is different from it.
460
    /// It makes search agains all bytes available.
461
    ///
462
    #[cfg_attr(windows, doc = "```no_run")]
463
    #[cfg_attr(unix, doc = "```")]
464
    /// # futures_lite::future::block_on(async {
465
    /// #
466
    /// let mut p = expectrl::spawn("echo 123").unwrap();
467
    /// // wait to guarantee that check will successed (most likely)
468
    /// std::thread::sleep(std::time::Duration::from_secs(1));
469
    /// let m = p.check(expectrl::Regex("\\d+")).await.unwrap();
470
    /// assert_eq!(m.get(0).unwrap(), b"123");
471
    /// #
472
    /// # });
473
    /// ```
474
    async fn check<E: Needle>(&mut self, needle: E) -> Result<Captures, Error> {
×
475
        let eof = self.try_fill().await?;
×
476

477
        let buf = self.stream.buffer();
×
478
        let found = needle.check(buf, eof)?;
×
479
        if !found.is_empty() {
×
480
            let end_index = Captures::right_most_index(&found);
×
481
            let involved_bytes = buf[..end_index].to_vec();
×
482
            self.stream.consume(end_index);
×
483
            return Ok(Captures::new(involved_bytes, found));
×
484
        }
485

486
        if eof {
×
487
            return Err(Error::Eof);
×
488
        }
489

490
        Ok(Captures::new(Vec::new(), Vec::new()))
×
491
    }
492

493
    /// Verifyes if stream is empty or not.
494
    async fn is_empty(&mut self) -> io::Result<bool> {
×
495
        match futures_lite::future::poll_once(self.read(&mut [])).await {
×
496
            Some(Ok(0)) => Ok(true),
497
            Some(Ok(_)) => Ok(false),
498
            Some(Err(err)) => Err(err),
×
499
            None => Ok(true),
500
        }
501
    }
502

503
    async fn try_fill(&mut self) -> Result<bool, Error> {
×
504
        match futures_lite::future::poll_once(self.stream.fill()).await {
×
505
            Some(Ok(n)) => Ok(n == 0),
×
506
            Some(Err(err)) => Err(err.into()),
×
507
            None => Ok(false),
508
        }
509
    }
510
}
511

512
impl<S: AsyncWrite + Unpin> AsyncWrite for Stream<S> {
513
    fn poll_write(
×
514
        mut self: Pin<&mut Self>,
515
        cx: &mut Context<'_>,
516
        buf: &[u8],
517
    ) -> Poll<io::Result<usize>> {
518
        Pin::new(&mut *self.stream.get_mut()).poll_write(cx, buf)
×
519
    }
520

521
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
522
        Pin::new(&mut *self.stream.get_mut()).poll_flush(cx)
×
523
    }
524

525
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
×
526
        Pin::new(&mut *self.stream.get_mut()).poll_close(cx)
×
527
    }
528

529
    fn poll_write_vectored(
×
530
        mut self: Pin<&mut Self>,
531
        cx: &mut Context<'_>,
532
        bufs: &[io::IoSlice<'_>],
533
    ) -> Poll<io::Result<usize>> {
534
        Pin::new(&mut *self.stream.get_mut()).poll_write_vectored(cx, bufs)
×
535
    }
536
}
537

538
impl<S: AsyncRead + Unpin> AsyncRead for Stream<S> {
539
    fn poll_read(
×
540
        mut self: Pin<&mut Self>,
541
        cx: &mut Context<'_>,
542
        buf: &mut [u8],
543
    ) -> Poll<io::Result<usize>> {
544
        Pin::new(&mut self.stream).poll_read(cx, buf)
×
545
    }
546
}
547

548
impl<S: AsyncRead + Unpin> AsyncBufRead for Stream<S> {
549
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
550
        Pin::new(&mut self.get_mut().stream).poll_fill_buf(cx)
×
551
    }
552

553
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
554
        Pin::new(&mut self.stream).consume(amt);
×
555
    }
556
}
557

558
/// Session represents a spawned process and its streams.
559
/// It controlls process and communication with it.
560
#[derive(Debug)]
561
struct BufferedStream<S> {
562
    stream: S,
563
    buffer: Vec<u8>,
564
    length: usize,
565
}
566

567
impl<S> BufferedStream<S> {
568
    fn new(stream: S) -> Self {
×
569
        Self {
570
            stream,
571
            buffer: Vec::new(),
×
572
            length: 0,
573
        }
574
    }
575

576
    fn keep(&mut self, buf: &[u8]) {
×
577
        self.buffer.extend(buf);
×
578
        self.length += buf.len();
×
579
    }
580

581
    fn buffer(&self) -> &[u8] {
×
582
        &self.buffer[..self.length]
×
583
    }
584

585
    fn get_mut(&mut self) -> &mut S {
×
586
        &mut self.stream
×
587
    }
588
}
589

590
impl<S: AsyncRead + Unpin> BufferedStream<S> {
591
    async fn fill(&mut self) -> io::Result<usize> {
×
592
        let mut buf = [0; 128];
×
593
        let n = self.stream.read(&mut buf).await?;
×
594
        self.keep(&buf[..n]);
×
595
        Ok(n)
×
596
    }
597
}
598

599
impl<S: AsyncRead + Unpin> AsyncRead for BufferedStream<S> {
600
    fn poll_read(
×
601
        mut self: Pin<&mut Self>,
602
        cx: &mut Context<'_>,
603
        buf: &mut [u8],
604
    ) -> Poll<io::Result<usize>> {
605
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
606
        let nread = std::io::Read::read(&mut rem, buf)?;
×
607
        self.consume(nread);
×
608
        Poll::Ready(Ok(nread))
×
609
    }
610

611
    fn poll_read_vectored(
×
612
        mut self: Pin<&mut Self>,
613
        cx: &mut Context<'_>,
614
        bufs: &mut [IoSliceMut<'_>],
615
    ) -> Poll<io::Result<usize>> {
616
        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
×
617
        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
×
618
        self.consume(nread);
×
619
        Poll::Ready(Ok(nread))
×
620
    }
621
}
622

623
impl<S: AsyncRead + Unpin> AsyncBufRead for BufferedStream<S> {
624
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
×
625
        if self.buffer.is_empty() {
×
626
            let mut buf = [0; 128];
×
627
            let n = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
×
628
            self.keep(&buf[..n]);
×
629
        }
630

631
        let buf = self.get_mut().buffer();
×
632
        Poll::Ready(Ok(buf))
×
633
    }
634

635
    fn consume(mut self: Pin<&mut Self>, amt: usize) {
×
636
        let _ = self.buffer.drain(..amt);
×
637
        self.length -= amt;
×
638
    }
639
}
640

641
#[cfg(test)]
642
mod tests {
643
    use futures_lite::AsyncWriteExt;
644

645
    use crate::Eof;
646

647
    use super::*;
648

649
    #[test]
650
    fn test_expect_lazy() {
651
        let buf = b"Hello World".to_vec();
652
        let cursor = futures_lite::io::Cursor::new(buf);
653
        let mut stream = Stream::new(cursor);
654

655
        futures_lite::future::block_on(async {
656
            let found = stream.expect_lazy("World").await.unwrap();
657
            assert_eq!(b"Hello ", found.before());
658
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
659
        });
660
    }
661

662
    #[test]
663
    fn test_expect_lazy_eof() {
664
        let buf = b"Hello World".to_vec();
665
        let cursor = futures_lite::io::Cursor::new(buf);
666
        let mut stream = Stream::new(cursor);
667

668
        futures_lite::future::block_on(async {
669
            let found = stream.expect_lazy(Eof).await.unwrap();
670
            assert_eq!(b"", found.before());
671
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
672
        });
673

674
        let cursor = futures_lite::io::Cursor::new(Vec::new());
675
        let mut stream = Stream::new(cursor);
676

677
        futures_lite::future::block_on(async {
678
            let err = stream.expect_lazy("").await.unwrap_err();
679
            assert!(matches!(err, Error::Eof));
680
        });
681
    }
682

683
    #[test]
684
    fn test_expect_lazy_timeout() {
685
        futures_lite::future::block_on(async {
686
            let mut stream = Stream::new(NoEofReader::default());
687
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
688

689
            stream.write_all(b"Hello").await.unwrap();
690

691
            let err = stream.expect_lazy("Hello World").await.unwrap_err();
692
            assert!(matches!(err, Error::ExpectTimeout));
693

694
            stream.write_all(b" World").await.unwrap();
695
            let found = stream.expect_lazy("World").await.unwrap();
696
            assert_eq!(b"Hello ", found.before());
697
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
698
        });
699
    }
700

701
    #[test]
702
    fn test_expect_gready() {
703
        let buf = b"Hello World".to_vec();
704
        let cursor = futures_lite::io::Cursor::new(buf);
705
        let mut stream = Stream::new(cursor);
706

707
        futures_lite::future::block_on(async {
708
            let found = stream.expect_gready("World").await.unwrap();
709
            assert_eq!(b"Hello ", found.before());
710
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
711
        });
712
    }
713

714
    #[test]
715
    fn test_expect_gready_eof() {
716
        let buf = b"Hello World".to_vec();
717
        let cursor = futures_lite::io::Cursor::new(buf);
718
        let mut stream = Stream::new(cursor);
719

720
        futures_lite::future::block_on(async {
721
            let found = stream.expect_gready(Eof).await.unwrap();
722
            assert_eq!(b"", found.before());
723
            assert_eq!(vec![b"Hello World"], found.matches().collect::<Vec<_>>());
724
        });
725

726
        let cursor = futures_lite::io::Cursor::new(Vec::new());
727
        let mut stream = Stream::new(cursor);
728

729
        futures_lite::future::block_on(async {
730
            let err = stream.expect_gready("").await.unwrap_err();
731
            assert!(matches!(err, Error::Eof));
732
        });
733
    }
734

735
    #[test]
736
    fn test_expect_gready_timeout() {
737
        futures_lite::future::block_on(async {
738
            let mut stream = Stream::new(NoEofReader::default());
739
            stream.set_expect_timeout(Some(Duration::from_millis(100)));
740

741
            stream.write_all(b"Hello").await.unwrap();
742

743
            let err = stream.expect_gready("Hello World").await.unwrap_err();
744
            assert!(matches!(err, Error::ExpectTimeout));
745

746
            stream.write_all(b" World").await.unwrap();
747
            let found = stream.expect_gready("World").await.unwrap();
748
            assert_eq!(b"Hello ", found.before());
749
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
750
        });
751
    }
752

753
    #[test]
754
    fn test_check() {
755
        let buf = b"Hello World".to_vec();
756
        let cursor = futures_lite::io::Cursor::new(buf);
757
        let mut stream = Stream::new(cursor);
758

759
        futures_lite::future::block_on(async {
760
            let found = stream.check("World").await.unwrap();
761
            assert_eq!(b"Hello ", found.before());
762
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
763
        });
764
    }
765

766
    #[test]
767
    fn test_is_matched() {
768
        let mut stream = Stream::new(NoEofReader::default());
769
        futures_lite::future::block_on(async {
770
            stream.write_all(b"Hello World").await.unwrap();
771
            assert!(stream.is_matched("World").await.unwrap());
772
            assert!(!stream.is_matched("*****").await.unwrap());
773

774
            let found = stream.check("World").await.unwrap();
775
            assert_eq!(b"Hello ", found.before());
776
            assert_eq!(vec![b"World"], found.matches().collect::<Vec<_>>());
777
        });
778
    }
779

780
    #[derive(Debug, Default)]
781
    struct NoEofReader {
782
        data: Vec<u8>,
783
    }
784

785
    impl AsyncWrite for NoEofReader {
786
        fn poll_write(
×
787
            mut self: Pin<&mut Self>,
788
            _: &mut Context<'_>,
789
            buf: &[u8],
790
        ) -> Poll<io::Result<usize>> {
791
            self.data.extend(buf);
×
792
            Poll::Ready(Ok(buf.len()))
×
793
        }
794

795
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
796
            Poll::Ready(Ok(()))
×
797
        }
798

799
        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
×
800
            Poll::Ready(Ok(()))
×
801
        }
802
    }
803

804
    impl AsyncRead for NoEofReader {
805
        fn poll_read(
×
806
            mut self: Pin<&mut Self>,
807
            _: &mut Context<'_>,
808
            mut buf: &mut [u8],
809
        ) -> Poll<io::Result<usize>> {
810
            if self.data.is_empty() {
×
811
                return Poll::Pending;
×
812
            }
813

814
            let n = std::io::Write::write(&mut buf, &self.data)?;
×
815
            let _ = self.data.drain(..n);
×
816
            Poll::Ready(Ok(n))
×
817
        }
818
    }
819
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc