• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 4368576544

pending completion
4368576544

push

github

GitHub
Make api interchangable for unix windows (#60)

73 of 73 new or added lines in 10 files covered. (100.0%)

1471 of 2612 relevant lines covered (56.32%)

3.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.29
/src/session/sync_session.rs
1
//! Module contains a Session structure.
2

3
use std::{
4
    io::{self, BufRead, BufReader, Read, Write},
5
    time::{self, Duration},
6
};
7

8
use crate::{
9
    error::Error,
10
    needle::Needle,
11
    process::{Healthcheck, NonBlocking},
12
    Captures,
13
};
14

15
/// Session represents a spawned process and its streams.
16
/// It controlls process and communication with it.
17
#[derive(Debug)]
18
pub struct Session<P, S> {
19
    proc: P,
20
    stream: TryStream<S>,
21
    expect_timeout: Option<Duration>,
22
    expect_lazy: bool,
23
}
24

25
impl<P, S> Session<P, S>
26
where
27
    S: Read,
28
{
29
    pub(crate) fn new(process: P, stream: S) -> io::Result<Self> {
10✔
30
        let stream = TryStream::new(stream)?;
20✔
31
        Ok(Self {
10✔
32
            proc: process,
10✔
33
            stream,
10✔
34
            expect_timeout: Some(Duration::from_millis(10000)),
20✔
35
            expect_lazy: false,
×
36
        })
37
    }
38

39
    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R: Read>(
2✔
40
        mut self,
41
        new_stream: F,
42
    ) -> Result<Session<P, R>, Error> {
43
        self.stream.flush_in_buffer();
2✔
44
        let buf = self.stream.get_available().to_owned();
2✔
45

46
        let stream = self.stream.into_inner();
4✔
47
        let new_stream = new_stream(stream);
2✔
48

49
        let mut session = Session::new(self.proc, new_stream)?;
2✔
50
        session.stream.keep_in_buffer(&buf);
4✔
51
        Ok(session)
2✔
52
    }
53
}
54

55
impl<P, S> Session<P, S> {
56
    /// Set the pty session's expect timeout.
57
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
1✔
58
        self.expect_timeout = expect_timeout;
1✔
59
    }
60

61
    /// Set a expect algorithm to be either gready or lazy.
62
    ///
63
    /// Default algorithm is gready.
64
    ///
65
    /// See [Session::expect].
66
    pub fn set_expect_lazy(&mut self, lazy: bool) {
1✔
67
        self.expect_lazy = lazy;
1✔
68
    }
69

70
    /// Get a reference to original stream.
71
    pub fn get_stream(&self) -> &S {
×
72
        self.stream.as_ref()
×
73
    }
74

75
    /// Get a mut reference to original stream.
76
    pub fn get_stream_mut(&mut self) -> &mut S {
×
77
        self.stream.as_mut()
×
78
    }
79

80
    /// Get a reference to a process running program.
81
    pub fn get_process(&self) -> &P {
6✔
82
        &self.proc
×
83
    }
84

85
    /// Get a mut reference to a process running program.
86
    pub fn get_process_mut(&mut self) -> &mut P {
5✔
87
        &mut self.proc
×
88
    }
89
}
90

91
impl<P: Healthcheck, S> Session<P, S> {
92
    /// Verifies whether process is still alive.
93
    pub fn is_alive(&mut self) -> Result<bool, Error> {
×
94
        self.proc.is_alive().map_err(|err| err.into())
×
95
    }
96
}
97

98
impl<P, S: Read + NonBlocking> Session<P, S> {
99
    /// Expect waits until a pattern is matched.
100
    ///
101
    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
102
    ///
103
    /// The match algorthm can be either
104
    ///     - gready
105
    ///     - lazy
106
    ///
107
    /// You can set one via [Session::set_expect_lazy].
108
    /// Default version is gready.
109
    ///
110
    /// The implications are.
111
    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
112
    /// And your process outputs `123`.
113
    /// In case of lazy approach we will match `1`.
114
    /// Where's in case of gready one we will match `123`.
115
    ///
116
    /// # Example
117
    ///
118
    #[cfg_attr(windows, doc = "```no_run")]
119
    #[cfg_attr(unix, doc = "```")]
120
    /// let mut p = expectrl::spawn("echo 123").unwrap();
121
    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
122
    /// assert_eq!(m.get(0).unwrap(), b"123");
123
    /// ```
124
    ///
125
    #[cfg_attr(windows, doc = "```no_run")]
126
    #[cfg_attr(unix, doc = "```")]
127
    /// let mut p = expectrl::spawn("echo 123").unwrap();
128
    /// p.set_expect_lazy(true);
129
    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
130
    /// assert_eq!(m.get(0).unwrap(), b"1");
131
    /// ```
132
    ///
133
    /// This behaviour is different from [Session::check].
134
    ///
135
    /// It returns an error if timeout is reached.
136
    /// You can specify a timeout value by [Session::set_expect_timeout] method.
137
    pub fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
9✔
138
    where
139
        N: Needle,
140
    {
141
        match self.expect_lazy {
9✔
142
            true => self.expect_lazy(needle),
1✔
143
            false => self.expect_gready(needle),
9✔
144
        }
145
    }
146

147
    /// Expect which fills as much as possible to the buffer.
148
    ///
149
    /// See [Session::expect].
150
    fn expect_gready<N>(&mut self, needle: N) -> Result<Captures, Error>
9✔
151
    where
152
        N: Needle,
153
    {
154
        let start = time::Instant::now();
18✔
155
        loop {
2✔
156
            let eof = self.stream.read_available()?;
9✔
157
            let data = self.stream.get_available();
18✔
158

159
            let found = needle.check(data, eof)?;
9✔
160
            if !found.is_empty() {
18✔
161
                let end_index = Captures::right_most_index(&found);
8✔
162
                let involved_bytes = data[..end_index].to_vec();
8✔
163
                self.stream.consume_available(end_index);
8✔
164

165
                return Ok(Captures::new(involved_bytes, found));
8✔
166
            }
167

168
            if eof {
3✔
169
                return Err(Error::Eof);
1✔
170
            }
171

172
            if let Some(timeout) = self.expect_timeout {
4✔
173
                if start.elapsed() > timeout {
4✔
174
                    return Err(Error::ExpectTimeout);
1✔
175
                }
176
            }
177
        }
178
    }
179

180
    /// Expect which reads byte by byte.
181
    ///
182
    /// See [Session::expect].
183
    fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
1✔
184
    where
185
        N: Needle,
186
    {
187
        let mut checking_data_length = 0;
1✔
188
        let mut eof = false;
1✔
189
        let start = time::Instant::now();
2✔
190
        loop {
1✔
191
            let mut available = self.stream.get_available();
1✔
192
            if checking_data_length == available.len() {
2✔
193
                // We read by byte to make things as lazy as possible.
194
                //
195
                // It's chose is important in using Regex as a Needle.
196
                // Imagine we have a `\d+` regex.
197
                // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
198
                //
199
                // The second reason is
200
                // if we wouldn't read by byte EOF indication could be lost.
201
                // And next blocking std::io::Read operation could be blocked forever.
202
                //
203
                // We could read all data available via `read_available` to reduce IO operations,
204
                // but in such case we would need to keep a EOF indicator internally in stream,
205
                // which is OK if EOF happens onces, but I am not sure if this is a case.
206
                eof = self.stream.read_available_once(&mut [0; 1])? == Some(0);
2✔
207
                available = self.stream.get_available();
1✔
208
            }
209

210
            // We intentinally not increase the counter
211
            // and run check one more time even though the data isn't changed.
212
            // Because it may be important for custom implementations of Needle.
213
            if checking_data_length < available.len() {
3✔
214
                checking_data_length += 1;
1✔
215
            }
216

217
            let data = &available[..checking_data_length];
2✔
218

219
            let found = needle.check(data, eof)?;
1✔
220
            if !found.is_empty() {
2✔
221
                let end_index = Captures::right_most_index(&found);
1✔
222
                let involved_bytes = data[..end_index].to_vec();
1✔
223
                self.stream.consume_available(end_index);
1✔
224
                return Ok(Captures::new(involved_bytes, found));
1✔
225
            }
226

227
            if eof {
1✔
228
                return Err(Error::Eof);
×
229
            }
230

231
            if let Some(timeout) = self.expect_timeout {
2✔
232
                if start.elapsed() > timeout {
2✔
233
                    return Err(Error::ExpectTimeout);
×
234
                }
235
            }
236
        }
237
    }
238

239
    /// Check verifies if a pattern is matched.
240
    /// Returns empty found structure if nothing found.
241
    ///
242
    /// Is a non blocking version of [Session::expect].
243
    /// But its strategy of matching is different from it.
244
    /// It makes search against all bytes available.
245
    ///
246
    /// # Example
247
    ///
248
    #[cfg_attr(windows, doc = "```no_run")]
249
    #[cfg_attr(unix, doc = "```")]
250
    /// use expectrl::{spawn, Regex};
251
    /// use std::time::Duration;
252
    ///
253
    /// let mut p = spawn("echo 123").unwrap();
254
    /// #
255
    /// # // wait to guarantee that check echo worked out (most likely)
256
    /// # std::thread::sleep(Duration::from_millis(500));
257
    /// #
258
    /// let m = p.check(Regex("\\d+")).unwrap();
259
    /// assert_eq!(m.get(0).unwrap(), b"123");
260
    /// ```
261
    pub fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
6✔
262
    where
263
        N: Needle,
264
    {
265
        let eof = self.stream.read_available()?;
12✔
266
        let buf = self.stream.get_available();
12✔
267

268
        let found = needle.check(buf, eof)?;
6✔
269
        if !found.is_empty() {
12✔
270
            let end_index = Captures::right_most_index(&found);
6✔
271
            let involved_bytes = buf[..end_index].to_vec();
6✔
272
            self.stream.consume_available(end_index);
6✔
273
            return Ok(Captures::new(involved_bytes, found));
6✔
274
        }
275

276
        if eof {
3✔
277
            return Err(Error::Eof);
×
278
        }
279

280
        Ok(Captures::new(Vec::new(), Vec::new()))
6✔
281
    }
282

283
    /// The functions checks if a pattern is matched.
284
    /// It doesn’t consumes bytes from stream.
285
    ///
286
    /// Its strategy of matching is different from the one in [Session::expect].
287
    /// It makes search agains all bytes available.
288
    ///
289
    /// If you want to get a matched result [Session::check] and [Session::expect] is a better option.
290
    /// Because it is not guaranteed that [Session::check] or [Session::expect] with the same parameters:
291
    ///     - will successed even right after Session::is_matched call.
292
    ///     - will operate on the same bytes.
293
    ///
294
    /// IMPORTANT:
295
    ///  
296
    /// If you call this method with [crate::Eof] pattern be aware that eof
297
    /// indication MAY be lost on the next interactions.
298
    /// It depends from a process you spawn.
299
    /// So it might be better to use [Session::check] or [Session::expect] with Eof.
300
    ///
301
    /// # Example
302
    ///
303
    #[cfg_attr(windows, doc = "```no_run")]
304
    #[cfg_attr(unix, doc = "```")]
305
    /// use expectrl::{spawn, Regex};
306
    /// use std::time::Duration;
307
    ///
308
    /// let mut p = spawn("cat").unwrap();
309
    /// p.send_line("123");
310
    /// # // wait to guarantee that check echo worked out (most likely)
311
    /// # std::thread::sleep(Duration::from_secs(1));
312
    /// let m = p.is_matched(Regex("\\d+")).unwrap();
313
    /// assert_eq!(m, true);
314
    /// ```
315
    pub fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
4✔
316
    where
317
        N: Needle,
318
    {
319
        let eof = self.stream.read_available()?;
8✔
320
        let buf = self.stream.get_available();
8✔
321

322
        let found = needle.check(buf, eof)?;
4✔
323
        if !found.is_empty() {
8✔
324
            return Ok(true);
4✔
325
        }
326

327
        if eof {
×
328
            return Err(Error::Eof);
×
329
        }
330

331
        Ok(false)
332
    }
333
}
334

335
impl<Proc, Stream: Write> Session<Proc, Stream> {
336
    /// Send text to child’s STDIN.
337
    ///
338
    /// You can also use methods from [std::io::Write] instead.
339
    ///
340
    /// # Example
341
    ///
342
    /// ```
343
    /// use expectrl::{spawn, ControlCode};
344
    ///
345
    /// let mut proc = spawn("cat").unwrap();
346
    ///
347
    /// proc.send("Hello");
348
    /// proc.send(b"World");
349
    /// proc.send(ControlCode::try_from("^C").unwrap());
350
    /// ```
351
    pub fn send<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
5✔
352
        self.stream.write_all(buf.as_ref())
10✔
353
    }
354

355
    /// Send a line to child’s STDIN.
356
    ///
357
    /// # Example
358
    ///
359
    /// ```
360
    /// use expectrl::{spawn, ControlCode};
361
    ///
362
    /// let mut proc = spawn("cat").unwrap();
363
    ///
364
    /// proc.send_line("Hello");
365
    /// proc.send_line(b"World");
366
    /// proc.send_line(ControlCode::try_from("^C").unwrap());
367
    /// ```
368
    pub fn send_line<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
9✔
369
        #[cfg(windows)]
370
        const LINE_ENDING: &[u8] = b"\r\n";
371
        #[cfg(not(windows))]
372
        const LINE_ENDING: &[u8] = b"\n";
373

374
        self.stream.write_all(buf.as_ref())?;
18✔
375
        self.write_all(LINE_ENDING)?;
18✔
376

377
        Ok(())
9✔
378
    }
379
}
380

381
impl<P, S: Read + NonBlocking> Session<P, S> {
382
    /// Try to read in a non-blocking mode.
383
    ///
384
    /// Returns `[std::io::ErrorKind::WouldBlock]`
385
    /// in case if there's nothing to read.
386
    pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2✔
387
        self.stream.try_read(buf)
2✔
388
    }
389

390
    /// Verifyes if stream is empty or not.
391
    pub fn is_empty(&mut self) -> io::Result<bool> {
1✔
392
        self.stream.is_empty()
1✔
393
    }
394
}
395

396
impl<P, S: Write> Write for Session<P, S> {
397
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
9✔
398
        self.stream.write(buf)
9✔
399
    }
400

401
    fn flush(&mut self) -> std::io::Result<()> {
1✔
402
        self.stream.flush()
1✔
403
    }
404

405
    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
×
406
        self.stream.write_vectored(bufs)
×
407
    }
408
}
409

410
impl<P, S: Read> Read for Session<P, S> {
411
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
6✔
412
        self.stream.read(buf)
6✔
413
    }
414
}
415

416
impl<P, S: Read> BufRead for Session<P, S> {
417
    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
4✔
418
        self.stream.fill_buf()
4✔
419
    }
420

421
    fn consume(&mut self, amt: usize) {
4✔
422
        self.stream.consume(amt)
4✔
423
    }
424
}
425

426
#[derive(Debug)]
427
struct TryStream<S> {
428
    stream: ControlledReader<S>,
429
}
430

431
impl<S> TryStream<S> {
432
    fn into_inner(self) -> S {
2✔
433
        self.stream.inner.into_inner().inner
2✔
434
    }
435

436
    fn as_ref(&self) -> &S {
×
437
        &self.stream.inner.get_ref().inner
×
438
    }
439

440
    fn as_mut(&mut self) -> &mut S {
×
441
        &mut self.stream.inner.get_mut().inner
×
442
    }
443
}
444

445
impl<S: Read> TryStream<S> {
446
    /// The function returns a new Stream from a file.
447
    fn new(stream: S) -> io::Result<Self> {
10✔
448
        Ok(Self {
10✔
449
            stream: ControlledReader::new(stream),
10✔
450
        })
451
    }
452

453
    fn flush_in_buffer(&mut self) {
2✔
454
        self.stream.flush_in_buffer();
2✔
455
    }
456
}
457

458
impl<S> TryStream<S> {
459
    fn keep_in_buffer(&mut self, v: &[u8]) {
2✔
460
        self.stream.keep_in_buffer(v);
2✔
461
    }
462

463
    fn get_available(&mut self) -> &[u8] {
7✔
464
        self.stream.get_available()
7✔
465
    }
466

467
    fn consume_available(&mut self, n: usize) {
6✔
468
        self.stream.consume_available(n)
6✔
469
    }
470
}
471

472
impl<R: Read + NonBlocking> TryStream<R> {
473
    /// Try to read in a non-blocking mode.
474
    ///
475
    /// It raises io::ErrorKind::WouldBlock if there's nothing to read.
476
    fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2✔
477
        self.stream.get_mut().set_non_blocking()?;
2✔
478

479
        let result = self.stream.inner.read(buf);
2✔
480

481
        // As file is DUPed changes in one descriptor affects all ones
482
        // so we need to make blocking file after we finished.
483
        self.stream.get_mut().set_blocking()?;
4✔
484

485
        result
2✔
486
    }
487

488
    #[allow(clippy::wrong_self_convention)]
489
    fn is_empty(&mut self) -> io::Result<bool> {
1✔
490
        match self.try_read(&mut []) {
1✔
491
            Ok(0) => Ok(true),
492
            Ok(_) => Ok(false),
493
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(true),
494
            Err(err) => Err(err),
×
495
        }
496
    }
497

498
    fn read_available(&mut self) -> std::io::Result<bool> {
6✔
499
        self.stream.flush_in_buffer();
6✔
500

501
        let mut buf = [0; 248];
6✔
502
        loop {
6✔
503
            match self.try_read_inner(&mut buf) {
6✔
504
                Ok(0) => break Ok(true),
3✔
505
                Ok(n) => {
6✔
506
                    self.stream.keep_in_buffer(&buf[..n]);
6✔
507
                }
508
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(false),
18✔
509
                Err(err) => break Err(err),
×
510
            }
511
        }
512
    }
513

514
    fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
1✔
515
        self.stream.flush_in_buffer();
1✔
516

517
        match self.try_read_inner(buf) {
1✔
518
            Ok(0) => Ok(Some(0)),
×
519
            Ok(n) => {
1✔
520
                self.stream.keep_in_buffer(&buf[..n]);
1✔
521
                Ok(Some(n))
1✔
522
            }
523
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(None),
×
524
            Err(err) => Err(err),
×
525
        }
526
    }
527

528
    // non-buffered && non-blocking read
529
    fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
6✔
530
        self.stream.get_mut().set_non_blocking()?;
6✔
531

532
        let result = self.stream.get_mut().read(buf);
6✔
533

534
        // As file is DUPed changes in one descriptor affects all ones
535
        // so we need to make blocking file after we finished.
536
        self.stream.get_mut().set_blocking()?;
12✔
537

538
        result
6✔
539
    }
540
}
541

542
impl<S: Write> Write for TryStream<S> {
543
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
9✔
544
        self.stream.inner.get_mut().inner.write(buf)
9✔
545
    }
546

547
    fn flush(&mut self) -> io::Result<()> {
1✔
548
        self.stream.inner.get_mut().inner.flush()
1✔
549
    }
550

551
    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
×
552
        self.stream.inner.get_mut().inner.write_vectored(bufs)
×
553
    }
554
}
555

556
impl<R: Read> Read for TryStream<R> {
557
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
6✔
558
        self.stream.inner.read(buf)
6✔
559
    }
560
}
561

562
impl<R: Read> BufRead for TryStream<R> {
563
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
4✔
564
        self.stream.inner.fill_buf()
4✔
565
    }
566

567
    fn consume(&mut self, amt: usize) {
4✔
568
        self.stream.inner.consume(amt)
4✔
569
    }
570
}
571

572
#[derive(Debug)]
573
struct ControlledReader<R> {
574
    inner: BufReader<BufferedReader<R>>,
575
}
576

577
impl<R: Read> ControlledReader<R> {
578
    fn new(reader: R) -> Self {
10✔
579
        Self {
580
            inner: BufReader::new(BufferedReader::new(reader)),
10✔
581
        }
582
    }
583

584
    fn flush_in_buffer(&mut self) {
14✔
585
        // Because we have 2 buffered streams there might appear inconsistancy
586
        // in read operations and the data which was via `keep_in_buffer` function.
587
        //
588
        // To eliminate it we move BufReader buffer to our buffer.
589
        let b = self.inner.buffer().to_vec();
7✔
590
        self.inner.consume(b.len());
14✔
591
        self.keep_in_buffer(&b);
7✔
592
    }
593
}
594

595
impl<R> ControlledReader<R> {
596
    fn keep_in_buffer(&mut self, v: &[u8]) {
9✔
597
        self.inner.get_mut().buffer.extend(v);
9✔
598
    }
599

600
    fn get_mut(&mut self) -> &mut R {
7✔
601
        &mut self.inner.get_mut().inner
7✔
602
    }
603

604
    fn get_available(&mut self) -> &[u8] {
7✔
605
        &self.inner.get_ref().buffer
7✔
606
    }
607

608
    fn consume_available(&mut self, n: usize) {
6✔
609
        let _ = self.inner.get_mut().buffer.drain(..n);
6✔
610
    }
611
}
612

613
#[derive(Debug)]
614
struct BufferedReader<R> {
615
    inner: R,
616
    buffer: Vec<u8>,
617
}
618

619
impl<R> BufferedReader<R> {
620
    fn new(reader: R) -> Self {
10✔
621
        Self {
622
            inner: reader,
623
            buffer: Vec::new(),
10✔
624
        }
625
    }
626
}
627

628
impl<R: Read> Read for BufferedReader<R> {
629
    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
9✔
630
        if self.buffer.is_empty() {
12✔
631
            self.inner.read(buf)
6✔
632
        } else {
633
            let n = buf.write(&self.buffer)?;
6✔
634
            let _ = self.buffer.drain(..n);
3✔
635
            Ok(n)
3✔
636
        }
637
    }
638
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc