• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8133416044

03 Mar 2024 10:18PM UTC coverage: 55.91% (-0.7%) from 56.575%
8133416044

Pull #68

github

web-flow
Merge e8846ebf2 into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

219 of 377 new or added lines in 14 files covered. (58.09%)

155 existing lines in 6 files now uncovered.

1490 of 2665 relevant lines covered (55.91%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.79
/src/session/sync_session.rs
1
//! Module contains a Session structure.
2

3
use std::{
4
    io::{self, BufRead, BufReader, Read, Write},
5
    time::{self, Duration},
6
};
7

8
use crate::{
9
    error::Error,
10
    expect::Expect,
11
    needle::Needle,
12
    process::{Healthcheck, NonBlocking, Termios},
13
    Captures,
14
};
15

16
/// Session represents a spawned process and its streams.
17
/// It controlls process and communication with it.
18
#[derive(Debug)]
19
pub struct Session<P, S> {
20
    proc: P,
21
    stream: TryStream<S>,
22
    expect_timeout: Option<Duration>,
23
    expect_lazy: bool,
24
}
25

26
impl<P, S> Session<P, S>
27
where
28
    S: Read,
29
{
30
    /// Creates a new session.
31
    pub fn new(process: P, stream: S) -> io::Result<Self> {
10✔
32
        let stream = TryStream::new(stream)?;
20✔
33

34
        Ok(Self {
10✔
35
            proc: process,
10✔
36
            stream,
10✔
37
            expect_timeout: Some(Duration::from_millis(10000)),
20✔
38
            expect_lazy: false,
×
39
        })
40
    }
41

42
    pub(crate) fn swap_stream<F, R>(mut self, new: F) -> Result<Session<P, R>, Error>
2✔
43
    where
44
        F: FnOnce(S) -> R,
45
        R: Read,
46
    {
47
        self.stream.flush_in_buffer();
2✔
48
        let buf = self.stream.get_available().to_owned();
2✔
49

50
        let stream = self.stream.into_inner();
4✔
51
        let stream = new(stream);
2✔
52

53
        let mut session = Session::new(self.proc, stream)?;
2✔
54
        session.stream.keep_in_buffer(&buf);
4✔
55

56
        Ok(session)
2✔
57
    }
58
}
59

60
impl<P, S> Session<P, S> {
61
    /// Set the pty session's expect timeout.
62
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
1✔
63
        self.expect_timeout = expect_timeout;
1✔
64
    }
65

66
    /// Set a expect algorithm to be either gready or lazy.
67
    ///
68
    /// Default algorithm is gready.
69
    ///
70
    /// See [Session::expect].
71
    pub fn set_expect_lazy(&mut self, lazy: bool) {
1✔
72
        self.expect_lazy = lazy;
1✔
73
    }
74

75
    /// Get a reference to original stream.
76
    pub fn get_stream(&self) -> &S {
×
77
        self.stream.as_ref()
×
78
    }
79

80
    /// Get a mut reference to original stream.
81
    pub fn get_stream_mut(&mut self) -> &mut S {
1✔
82
        self.stream.as_mut()
1✔
83
    }
84

85
    /// Get a reference to a process running program.
86
    pub fn get_process(&self) -> &P {
5✔
87
        &self.proc
1✔
88
    }
89

90
    /// Get a mut reference to a process running program.
91
    pub fn get_process_mut(&mut self) -> &mut P {
6✔
92
        &mut self.proc
×
93
    }
94
}
95

96
impl<P, S> Expect for Session<P, S>
97
where
98
    S: Write + Read + NonBlocking,
99
{
100
    fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
9✔
101
    where
102
        N: Needle,
103
    {
104
        match self.expect_lazy {
9✔
105
            true => self.expect_lazy(needle),
1✔
106
            false => self.expect_gready(needle),
9✔
107
        }
108
    }
109

110
    fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
6✔
111
    where
112
        N: Needle,
113
    {
114
        let eof = self.stream.read_available()?;
12✔
115
        let buf = self.stream.get_available();
12✔
116

117
        let found = needle.check(buf, eof)?;
6✔
118
        if !found.is_empty() {
12✔
119
            let end_index = Captures::right_most_index(&found);
12✔
120
            let involved_bytes = buf[..end_index].to_vec();
6✔
121
            self.stream.consume_available(end_index);
6✔
122
            return Ok(Captures::new(involved_bytes, found));
6✔
123
        }
124

125
        if eof {
3✔
NEW
126
            return Err(Error::Eof);
×
127
        }
128

129
        Ok(Captures::new(Vec::new(), Vec::new()))
6✔
130
    }
131

132
    fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
4✔
133
    where
134
        N: Needle,
135
    {
136
        let eof = self.stream.read_available()?;
8✔
137
        let buf = self.stream.get_available();
8✔
138

139
        let found = needle.check(buf, eof)?;
4✔
140
        if !found.is_empty() {
8✔
141
            return Ok(true);
4✔
142
        }
143

NEW
144
        if eof {
×
NEW
145
            return Err(Error::Eof);
×
146
        }
147

148
        Ok(false)
149
    }
150

151
    fn send<B>(&mut self, buf: B) -> Result<(), Error>
5✔
152
    where
153
        B: AsRef<[u8]>,
154
    {
155
        self.stream.write_all(buf.as_ref())?;
10✔
156

157
        Ok(())
5✔
158
    }
159

160
    fn send_line<B>(&mut self, buf: B) -> Result<(), Error>
9✔
161
    where
162
        B: AsRef<[u8]>,
163
    {
164
        #[cfg(windows)]
165
        const LINE_ENDING: &[u8] = b"\r\n";
166
        #[cfg(not(windows))]
167
        const LINE_ENDING: &[u8] = b"\n";
168

169
        self.stream.write_all(buf.as_ref())?;
18✔
170
        self.write_all(LINE_ENDING)?;
27✔
171

172
        Ok(())
9✔
173
    }
174
}
175

176
impl<P, S> Session<P, S>
177
where
178
    S: Read + NonBlocking,
179
{
180
    /// Expect which fills as much as possible to the buffer.
181
    ///
182
    /// See [Session::expect].
183
    fn expect_gready<N>(&mut self, needle: N) -> Result<Captures, Error>
9✔
184
    where
185
        N: Needle,
186
    {
187
        let start = time::Instant::now();
18✔
188
        loop {
2✔
189
            let eof = self.stream.read_available()?;
9✔
190
            let data = self.stream.get_available();
18✔
191

192
            let found = needle.check(data, eof)?;
9✔
193
            if !found.is_empty() {
18✔
194
                let end_index = Captures::right_most_index(&found);
16✔
195
                let involved_bytes = data[..end_index].to_vec();
8✔
196
                self.stream.consume_available(end_index);
8✔
197

198
                return Ok(Captures::new(involved_bytes, found));
8✔
199
            }
200

201
            if eof {
3✔
202
                return Err(Error::Eof);
1✔
203
            }
204

205
            if let Some(timeout) = self.expect_timeout {
4✔
206
                if start.elapsed() > timeout {
4✔
207
                    return Err(Error::ExpectTimeout);
1✔
208
                }
209
            }
210
        }
211
    }
212

213
    /// Expect which reads byte by byte.
214
    ///
215
    /// See [Session::expect].
216
    fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
1✔
217
    where
218
        N: Needle,
219
    {
220
        let mut checking_data_length = 0;
1✔
221
        let mut eof = false;
1✔
222
        let start = time::Instant::now();
2✔
223
        loop {
1✔
224
            let mut available = self.stream.get_available();
1✔
225
            if checking_data_length == available.len() {
2✔
226
                // We read by byte to make things as lazy as possible.
227
                //
228
                // It's chose is important in using Regex as a Needle.
229
                // Imagine we have a `\d+` regex.
230
                // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
231
                //
232
                // The second reason is
233
                // if we wouldn't read by byte EOF indication could be lost.
234
                // And next blocking std::io::Read operation could be blocked forever.
235
                //
236
                // We could read all data available via `read_available` to reduce IO operations,
237
                // but in such case we would need to keep a EOF indicator internally in stream,
238
                // which is OK if EOF happens onces, but I am not sure if this is a case.
239
                eof = self.stream.read_available_once(&mut [0; 1])? == Some(0);
6✔
240
                available = self.stream.get_available();
1✔
241
            }
242

243
            // We intentinally not increase the counter
244
            // and run check one more time even though the data isn't changed.
245
            // Because it may be important for custom implementations of Needle.
246
            if checking_data_length < available.len() {
2✔
247
                checking_data_length += 1;
1✔
248
            }
249

250
            let data = &available[..checking_data_length];
2✔
251

252
            let found = needle.check(data, eof)?;
1✔
253
            if !found.is_empty() {
2✔
254
                let end_index = Captures::right_most_index(&found);
2✔
255
                let involved_bytes = data[..end_index].to_vec();
1✔
256
                self.stream.consume_available(end_index);
1✔
257
                return Ok(Captures::new(involved_bytes, found));
1✔
258
            }
259

260
            if eof {
1✔
261
                return Err(Error::Eof);
×
262
            }
263

264
            if let Some(timeout) = self.expect_timeout {
2✔
265
                if start.elapsed() > timeout {
2✔
266
                    return Err(Error::ExpectTimeout);
×
267
                }
268
            }
269
        }
270
    }
271
}
272

273
impl<P, S> Session<P, S>
274
where
275
    S: Read + NonBlocking,
276
{
277
    /// Try to read in a non-blocking mode.
278
    ///
279
    /// Returns [`std::io::ErrorKind::WouldBlock`]
280
    /// in case there's nothing to read.
281
    pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1✔
282
        self.stream.try_read(buf)
1✔
283
    }
284

285
    /// Verifyes if stream is empty or not.
286
    pub fn is_empty(&mut self) -> io::Result<bool> {
1✔
287
        self.stream.is_empty()
1✔
288
    }
289
}
290

291
impl<P, S> Write for Session<P, S>
292
where
293
    S: Write,
294
{
295
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
9✔
296
        self.stream.write(buf)
9✔
297
    }
298

299
    fn flush(&mut self) -> std::io::Result<()> {
1✔
300
        self.stream.flush()
1✔
301
    }
302

303
    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
×
304
        self.stream.write_vectored(bufs)
×
305
    }
306
}
307

308
impl<P, S> Read for Session<P, S>
309
where
310
    S: Read,
311
{
312
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
7✔
313
        self.stream.read(buf)
7✔
314
    }
315
}
316

317
impl<P, S> BufRead for Session<P, S>
318
where
319
    S: Read,
320
{
321
    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
4✔
322
        self.stream.fill_buf()
4✔
323
    }
324

325
    fn consume(&mut self, amt: usize) {
4✔
326
        self.stream.consume(amt)
4✔
327
    }
328
}
329

330
impl<P, S> Healthcheck for Session<P, S>
331
where
332
    P: Healthcheck,
333
{
334
    type Status = P::Status;
335

336
    fn get_status(&self) -> io::Result<Self::Status> {
2✔
337
        self.get_process().get_status()
2✔
338
    }
339

NEW
340
    fn is_alive(&self) -> io::Result<bool> {
×
NEW
341
        self.get_process().is_alive()
×
342
    }
343
}
344

345
impl<P, S> Termios for Session<P, S>
346
where
347
    P: Termios,
348
{
349
    fn is_echo(&self) -> io::Result<bool> {
2✔
350
        self.get_process().is_echo()
2✔
351
    }
352

353
    fn set_echo(&mut self, on: bool) -> io::Result<bool> {
2✔
354
        self.get_process_mut().set_echo(on)
2✔
355
    }
356
}
357

358
impl<P, S> NonBlocking for Session<P, S>
359
where
360
    S: NonBlocking,
361
{
362
    fn set_blocking(&mut self, on: bool) -> io::Result<()> {
1✔
363
        S::set_blocking(self.get_stream_mut(), on)
1✔
364
    }
365
}
366

367
#[cfg(unix)]
368
impl<P, S> std::os::fd::AsRawFd for Session<P, S>
369
where
370
    S: std::os::fd::AsRawFd,
371
{
NEW
372
    fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
×
NEW
373
        self.get_stream().as_raw_fd()
×
374
    }
375
}
376

377
#[cfg(unix)]
378
impl<P, S> std::os::fd::AsRawFd for &Session<P, S>
379
where
380
    S: std::os::fd::AsRawFd,
381
{
NEW
382
    fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
×
NEW
383
        self.get_stream().as_raw_fd()
×
384
    }
385
}
386

387
#[cfg(unix)]
388
impl<P, S> std::os::fd::AsRawFd for &mut Session<P, S>
389
where
390
    S: std::os::fd::AsRawFd,
391
{
NEW
392
    fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
×
NEW
393
        self.get_stream().as_raw_fd()
×
394
    }
395
}
396

397
#[derive(Debug)]
398
struct TryStream<S> {
399
    stream: ControlledReader<S>,
400
}
401

402
impl<S> TryStream<S> {
403
    fn into_inner(self) -> S {
2✔
404
        self.stream.inner.into_inner().inner
2✔
405
    }
406

407
    fn as_ref(&self) -> &S {
×
408
        &self.stream.inner.get_ref().inner
×
409
    }
410

411
    fn as_mut(&mut self) -> &mut S {
1✔
412
        &mut self.stream.inner.get_mut().inner
1✔
413
    }
414
}
415

416
impl<S> TryStream<S>
417
where
418
    S: Read,
419
{
420
    /// The function returns a new Stream from a file.
421
    fn new(stream: S) -> io::Result<Self> {
10✔
422
        Ok(Self {
10✔
423
            stream: ControlledReader::new(stream),
10✔
424
        })
425
    }
426

427
    fn flush_in_buffer(&mut self) {
2✔
428
        self.stream.flush_in_buffer();
2✔
429
    }
430
}
431

432
impl<S> TryStream<S> {
433
    fn keep_in_buffer(&mut self, v: &[u8]) {
2✔
434
        self.stream.keep_in_buffer(v);
2✔
435
    }
436

437
    fn get_available(&mut self) -> &[u8] {
7✔
438
        self.stream.get_available()
7✔
439
    }
440

441
    fn consume_available(&mut self, n: usize) {
6✔
442
        self.stream.consume_available(n)
6✔
443
    }
444
}
445

446
impl<R> TryStream<R>
447
where
448
    R: Read + NonBlocking,
449
{
450
    /// Try to read in a non-blocking mode.
451
    ///
452
    /// It raises io::ErrorKind::WouldBlock if there's nothing to read.
453
    fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1✔
454
        self.stream.get_mut().set_blocking(false)?;
1✔
455

456
        let result = self.stream.inner.read(buf);
1✔
457

458
        // As file is DUPed changes in one descriptor affects all ones
459
        // so we need to make blocking file after we finished.
460
        self.stream.get_mut().set_blocking(true)?;
3✔
461

462
        result
1✔
463
    }
464

465
    #[allow(clippy::wrong_self_convention)]
466
    fn is_empty(&mut self) -> io::Result<bool> {
1✔
467
        match self.try_read(&mut []) {
1✔
468
            Ok(0) => Ok(true),
469
            Ok(_) => Ok(false),
470
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(true),
471
            Err(err) => Err(err),
×
472
        }
473
    }
474

475
    fn read_available(&mut self) -> std::io::Result<bool> {
6✔
476
        self.stream.flush_in_buffer();
6✔
477

478
        let mut buf = [0; 248];
6✔
479
        loop {
6✔
480
            match self.try_read_inner(&mut buf) {
6✔
481
                Ok(0) => break Ok(true),
3✔
482
                Ok(n) => {
6✔
483
                    self.stream.keep_in_buffer(&buf[..n]);
12✔
484
                }
485
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(false),
18✔
486
                Err(err) => break Err(err),
×
487
            }
488
        }
489
    }
490

491
    fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
1✔
492
        self.stream.flush_in_buffer();
1✔
493

494
        match self.try_read_inner(buf) {
1✔
495
            Ok(0) => Ok(Some(0)),
×
496
            Ok(n) => {
1✔
497
                self.stream.keep_in_buffer(&buf[..n]);
2✔
498
                Ok(Some(n))
1✔
499
            }
500
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(None),
×
501
            Err(err) => Err(err),
×
502
        }
503
    }
504

505
    // non-buffered && non-blocking read
506
    fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
6✔
507
        self.stream.get_mut().set_blocking(false)?;
6✔
508

509
        let result = self.stream.get_mut().read(buf);
6✔
510

511
        // As file is DUPed changes in one descriptor affects all ones
512
        // so we need to make blocking file after we finished.
513
        self.stream.get_mut().set_blocking(true)?;
18✔
514

515
        result
6✔
516
    }
517
}
518

519
impl<S> Write for TryStream<S>
520
where
521
    S: Write,
522
{
523
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
9✔
524
        self.stream.inner.get_mut().inner.write(buf)
9✔
525
    }
526

527
    fn flush(&mut self) -> io::Result<()> {
1✔
528
        self.stream.inner.get_mut().inner.flush()
1✔
529
    }
530

531
    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
×
532
        self.stream.inner.get_mut().inner.write_vectored(bufs)
×
533
    }
534
}
535

536
impl<R> Read for TryStream<R>
537
where
538
    R: Read,
539
{
540
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
7✔
541
        self.stream.inner.read(buf)
7✔
542
    }
543
}
544

545
impl<R> BufRead for TryStream<R>
546
where
547
    R: Read,
548
{
549
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
4✔
550
        self.stream.inner.fill_buf()
4✔
551
    }
552

553
    fn consume(&mut self, amt: usize) {
4✔
554
        self.stream.inner.consume(amt)
4✔
555
    }
556
}
557

558
#[derive(Debug)]
559
struct ControlledReader<R> {
560
    inner: BufReader<BufferedReader<R>>,
561
}
562

563
impl<R> ControlledReader<R>
564
where
565
    R: Read,
566
{
567
    fn new(reader: R) -> Self {
10✔
568
        Self {
569
            inner: BufReader::new(BufferedReader::new(reader)),
10✔
570
        }
571
    }
572

573
    fn flush_in_buffer(&mut self) {
7✔
574
        // Because we have 2 buffered streams there might appear inconsistancy
575
        // in read operations and the data which was via `keep_in_buffer` function.
576
        //
577
        // To eliminate it we move BufReader buffer to our buffer.
578
        let b = self.inner.buffer().to_vec();
7✔
579
        self.inner.consume(b.len());
14✔
580
        self.keep_in_buffer(&b);
7✔
581
    }
582
}
583

584
impl<R> ControlledReader<R> {
585
    fn keep_in_buffer(&mut self, v: &[u8]) {
9✔
586
        self.inner.get_mut().buffer.extend(v);
9✔
587
    }
588

589
    fn get_mut(&mut self) -> &mut R {
7✔
590
        &mut self.inner.get_mut().inner
7✔
591
    }
592

593
    fn get_available(&mut self) -> &[u8] {
7✔
594
        &self.inner.get_ref().buffer
7✔
595
    }
596

597
    fn consume_available(&mut self, n: usize) {
6✔
598
        let _ = self.inner.get_mut().buffer.drain(..n);
6✔
599
    }
600
}
601

602
#[derive(Debug)]
603
struct BufferedReader<R> {
604
    inner: R,
605
    buffer: Vec<u8>,
606
}
607

608
impl<R> BufferedReader<R> {
609
    fn new(reader: R) -> Self {
10✔
610
        Self {
611
            inner: reader,
612
            buffer: Vec::new(),
10✔
613
        }
614
    }
615
}
616

617
impl<R> Read for BufferedReader<R>
618
where
619
    R: Read,
620
{
621
    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
9✔
622
        if self.buffer.is_empty() {
12✔
623
            self.inner.read(buf)
6✔
624
        } else {
625
            let n = buf.write(&self.buffer)?;
6✔
626
            let _ = self.buffer.drain(..n);
3✔
627
            Ok(n)
3✔
628
        }
629
    }
630
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc