• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8131961689

03 Mar 2024 05:43PM UTC coverage: 55.931% (-0.6%) from 56.575%
8131961689

Pull #68

github

web-flow
Merge c48d7fe3a into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

219 of 376 new or added lines in 14 files covered. (58.24%)

155 existing lines in 6 files now uncovered.

1490 of 2664 relevant lines covered (55.93%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.53
/src/interact/session.rs
1
//! This module contains a [`InteractSession`] which runs an interact session with IO.
2

3
// todo: PtyProcess wait_echo optimize by not looping when timout is none
4

5
use std::{
6
    borrow::Cow,
7
    io::{ErrorKind, Write},
8
};
9

10
#[cfg(not(feature = "async"))]
11
use std::io::Read;
12

13
#[cfg(feature = "async")]
14
use std::{io, time::Duration};
15

16
#[cfg(feature = "async")]
17
use futures_timer::Delay;
18

19
#[cfg(feature = "async")]
20
use futures_lite::{
21
    future,
22
    io::{AsyncRead, AsyncWrite},
23
    AsyncReadExt, AsyncWriteExt,
24
};
25

26
use crate::{process::Healthcheck, Error};
27

28
#[cfg(unix)]
29
use crate::process::Termios;
30

31
#[cfg(not(feature = "async"))]
32
use crate::Expect;
33

34
#[cfg(feature = "async")]
35
use crate::AsyncExpect;
36

37
use crate::interact::Context;
38
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
39
use crate::process::NonBlocking;
40

41
#[cfg(unix)]
42
use crate::process::unix::WaitStatus;
43

44
type ExpectResult<T> = Result<T, Error>;
45

46
/// InteractConfig represents options of an interactive session.
47
pub struct InteractSession<Session, Input, Output, State> {
48
    session: Session,
49
    input: Input,
50
    output: Output,
51
    escape_character: u8,
52
    #[cfg(unix)]
53
    status: Option<WaitStatus>,
54
    opts: InteractOptions<Session, Input, Output, State>,
55
}
56

57
/// Interact options (aka callbacks you can set to be callled being in an interactive mode).
58
struct InteractOptions<S, I, O, C> {
59
    state: C,
60
    input_filter: Option<OptFilter>,
61
    output_filter: Option<OptFilter>,
62
    input_action: Option<OptAction<S, I, O, C>>,
63
    output_action: Option<OptAction<S, I, O, C>>,
64
    idle_action: Option<OptAction<S, I, O, C>>,
65
}
66

67
type OptAction<S, I, O, C> = Box<dyn FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool>>;
68

69
type OptFilter = Box<dyn FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>>>;
70

71
impl<S, I, O, C> InteractSession<S, I, O, C> {
72
    /// Default escape character. <Ctrl-\]>
73
    pub const ESCAPE: u8 = 29;
74

75
    /// Creates a new object of [`InteractSession`].
76
    pub fn new(session: S, input: I, output: O, state: C) -> InteractSession<S, I, O, C> {
6✔
77
        InteractSession {
78
            input,
79
            output,
80
            session,
81
            escape_character: Self::ESCAPE,
82
            opts: InteractOptions {
6✔
83
                state,
84
                input_filter: None,
85
                output_filter: None,
86
                input_action: None,
87
                output_action: None,
88
                idle_action: None,
89
            },
90
            #[cfg(unix)]
91
            status: None,
92
        }
93
    }
94

95
    /// Sets an escape character after seen which the interact interactions will be stopped
96
    /// and controll will be returned to a caller process.
97
    pub fn set_escape_character(mut self, c: u8) -> Self {
×
98
        self.escape_character = c;
×
99
        self
×
100
    }
101

102
    /// Returns a status of spawned session if it was exited.
103
    ///
104
    /// If [`Self::spawn`] returns false but this method returns None it means that a child process was shutdown by various reasons.
105
    /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned.
106
    ///
107
    /// [`Self::spawn`]: crate::interact::InteractSession::spawn
108
    #[cfg(unix)]
NEW
109
    pub fn get_status(&self) -> Option<WaitStatus> {
×
110
        self.status
×
111
    }
112
}
113

114
impl<S, I, O, C> InteractSession<S, I, O, C> {
115
    /// Set a state
116
    pub fn with_state<State>(self, state: State) -> InteractSession<S, I, O, State> {
2✔
117
        let mut s = InteractSession::new(self.session, self.input, self.output, state);
2✔
118
        s.escape_character = self.escape_character;
2✔
119
        #[cfg(unix)]
120
        {
121
            s.status = self.status;
2✔
122
        }
123

124
        s
2✔
125
    }
126

127
    /// Get a reference on state
NEW
128
    pub fn get_state(&self) -> &C {
×
NEW
129
        &self.opts.state
×
130
    }
131

132
    /// Get a mut reference on state
NEW
133
    pub fn get_state_mut(&mut self) -> &mut C {
×
NEW
134
        &mut self.opts.state
×
135
    }
136

137
    /// Returns a inner state.
138
    pub fn into_state(self) -> C {
1✔
139
        self.opts.state
1✔
140
    }
141

142
    /// Sets the output filter.
143
    /// The output_filter will be passed all the output from the child process.
144
    ///
145
    /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks.
146
    pub fn set_output_filter<F>(&mut self, filter: F) -> &mut Self
1✔
147
    where
148
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
149
    {
150
        self.opts.output_filter = Some(Box::new(filter));
2✔
NEW
151
        self
×
152
    }
153

154
    /// Sets the input filter.
155
    /// The input_filter will be passed all the keyboard input from the user.
156
    ///
157
    /// The input_filter is run BEFORE the check for the escape_character.
158
    /// The filter is called BEFORE calling a on_input callback if it's set.
159
    pub fn set_input_filter<F>(&mut self, filter: F) -> &mut Self
1✔
160
    where
161
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
162
    {
163
        self.opts.input_filter = Some(Box::new(filter));
2✔
NEW
164
        self
×
165
    }
166

167
    /// Puts a hanlder which will be called when users input is detected.
168
    ///
169
    /// Be aware that currently async version doesn't take a Session as an argument.
170
    /// See <https://github.com/zhiburt/expectrl/issues/16>.
171
    pub fn set_input_action<F>(&mut self, action: F) -> &mut Self
3✔
172
    where
173
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
174
    {
175
        self.opts.input_action = Some(Box::new(action));
6✔
NEW
176
        self
×
177
    }
178

179
    /// Puts a hanlder which will be called when process output is detected.
180
    ///
181
    /// IMPORTANT:
182
    ///
183
    /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session
184
    /// will cause the read bytes not to apeard in the output stream!
185
    pub fn set_output_action<F>(&mut self, action: F) -> &mut Self
4✔
186
    where
187
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
188
    {
189
        self.opts.output_action = Some(Box::new(action));
8✔
NEW
190
        self
×
191
    }
192

193
    /// Puts a handler which will be called on each interaction when no input is detected.
194
    pub fn set_idle_action<F>(&mut self, action: F) -> &mut Self
1✔
195
    where
196
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
197
    {
198
        self.opts.idle_action = Some(Box::new(action));
2✔
NEW
199
        self
×
200
    }
201
}
202

203
#[cfg(all(unix, not(any(feature = "async", feature = "polling"))))]
204
impl<S, I, O, C> InteractSession<S, I, O, C>
205
where
206
    I: Read,
207
    O: Write,
208
    S: Expect + Termios + Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
209
{
210
    /// Runs the session.
211
    ///
212
    /// See [`Session::interact`].
213
    ///
214
    /// [`Session::interact`]: crate::session::Session::interact
215
    pub fn spawn(&mut self) -> ExpectResult<bool> {
5✔
216
        let is_echo = self.session.is_echo()?;
5✔
217
        if !is_echo {
5✔
218
            let _ = self.session.set_echo(true);
5✔
219
        }
220

221
        self.status = None;
5✔
222
        let is_alive = interact_buzy_loop(self)?;
5✔
223

224
        if !is_echo {
5✔
225
            let _ = self.session.set_echo(false);
5✔
226
        }
227

228
        Ok(is_alive)
5✔
229
    }
230
}
231

232
#[cfg(all(windows, not(any(feature = "async", feature = "polling"))))]
233
impl<S, I, O, C> InteractSession<S, I, O, C>
234
where
235
    I: Read,
236
    O: Write,
237
    S: Expect + Healthcheck + NonBlocking + Write + Read,
238
{
239
    /// Runs the session.
240
    ///
241
    /// See [`Session::interact`].
242
    ///
243
    /// [`Session::interact`]: crate::session::Session::interact
NEW
244
    pub fn spawn(&mut self) -> ExpectResult<bool> {
×
NEW
245
        interact_buzy_loop(self)
×
246
    }
247
}
248

249
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
250
impl<S, I, O, C> InteractSession<S, I, O, C>
251
where
252
    I: Read + std::os::unix::io::AsRawFd,
253
    O: Write,
254
    S: Expect
255
        + Termios
256
        + Healthcheck<Status = WaitStatus>
257
        + Write
258
        + Read
259
        + std::os::unix::io::AsRawFd,
260
{
261
    /// Runs the session.
262
    ///
263
    /// See [`Session::interact`].
264
    ///
265
    /// [`Session::interact`]: crate::session::Session::interact
NEW
266
    pub fn spawn(&mut self) -> ExpectResult<bool> {
×
267
        #[cfg(unix)]
268
        {
NEW
269
            let is_echo = self.session.is_echo()?;
×
270
            if !is_echo {
×
NEW
271
                let _ = self.session.set_echo(true);
×
272
            }
273

NEW
274
            self.status = None;
×
NEW
275
            let is_alive = interact_polling(self)?;
×
276

277
            if !is_echo {
×
NEW
278
                let _ = self.session.set_echo(false);
×
279
            }
280

281
            Ok(is_alive)
×
282
        }
283

284
        #[cfg(windows)]
285
        {
NEW
286
            interact_buzy_loop(self)
×
287
        }
288
    }
289
}
290

291
#[cfg(all(unix, feature = "async"))]
292
impl<S, I, O, C> InteractSession<S, I, O, C>
293
where
294
    I: AsyncRead + Unpin,
295
    O: AsyncWrite + Unpin,
296
    S: AsyncExpect + Termios + Healthcheck<Status = WaitStatus> + AsyncWrite + AsyncRead + Unpin,
297
{
298
    /// Runs the session.
299
    ///
300
    /// See [`Session::interact`].
301
    ///
302
    /// [`Session::interact`]: crate::session::Session::interact
NEW
303
    pub async fn spawn(&mut self) -> Result<bool, Error> {
×
NEW
304
        let is_echo = self.session.is_echo().map_err(Error::IO)?;
×
NEW
305
        if !is_echo {
×
NEW
306
            let _ = self.session.set_echo(true);
×
307
        }
308

NEW
309
        let is_alive = interact_async(self).await?;
×
310

NEW
311
        if !is_echo {
×
NEW
312
            let _ = self.session.set_echo(false);
×
313
        }
314

NEW
315
        Ok(is_alive)
×
316
    }
317
}
318

319
#[cfg(all(windows, feature = "async"))]
320
impl<S, I, O, C> InteractSession<S, I, O, C>
321
where
322
    I: AsyncRead + Unpin,
323
    O: AsyncWrite + Unpin,
324
    S: AsyncExpect + Healthcheck + AsyncWrite + AsyncRead + Unpin,
325
{
326
    /// Runs the session.
327
    ///
328
    /// See [`Session::interact`].
329
    ///
330
    /// [`Session::interact`]: crate::session::Session::interact
NEW
331
    pub async fn spawn(&mut self) -> Result<bool, Error> {
×
NEW
332
        interact_async(self).await
×
333
    }
334
}
335

336
#[cfg(all(windows, feature = "polling", not(feature = "async")))]
337
impl<I, O, C> InteractSession<crate::session::OsSession, I, O, C>
338
where
339
    I: Read + Clone + Send + 'static,
340
    O: Write,
341
{
342
    /// Runs the session.
343
    ///
344
    /// See [`Session::interact`].
345
    ///
346
    /// [`Session::interact`]: crate::session::Session::interact
NEW
347
    pub fn spawn(&mut self) -> Result<bool, Error> {
×
NEW
348
        interact_polling_on_thread(self)
×
349
    }
350
}
351

352
impl<S, I, O, C> std::fmt::Debug for InteractSession<S, I, O, C>
353
where
354
    S: std::fmt::Debug,
355
    I: std::fmt::Debug,
356
    O: std::fmt::Debug,
357
    C: std::fmt::Debug,
358
{
359
    #[rustfmt::skip]
NEW
360
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
361
        let mut s = f.debug_struct("InteractSession");
×
NEW
362
        let _ = s.field("session", &self.session)
×
NEW
363
            .field("input", &self.input)
×
NEW
364
            .field("output", &self.output)
×
NEW
365
            .field("escape_character", &self.escape_character);
×
366

367
        #[cfg(unix)]
368
        {
NEW
369
            let _ = s.field("status", &self.status);
×
370
        }
371

NEW
372
        let _ = s
×
NEW
373
            .field("state", &std::ptr::addr_of!(self.opts.state))
×
NEW
374
            .field("opts:on_idle", &get_pointer(&self.opts.idle_action))
×
NEW
375
            .field("opts:on_input", &get_pointer(&self.opts.input_action))
×
NEW
376
            .field("opts:on_output", &get_pointer(&self.opts.output_action))
×
NEW
377
            .field("opts:input_filter", &get_pointer(&self.opts.input_filter))
×
NEW
378
            .field("opts:output_filter", &get_pointer(&self.opts.output_filter));
×
379

NEW
380
        s.finish()
×
381
    }
382
}
383

384
#[cfg(all(unix, not(feature = "async"), not(feature = "polling")))]
385
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
5✔
386
where
387
    S: Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
388
    O: Write,
389
    I: Read,
390
{
391
    let mut buf = [0; 512];
5✔
392

393
    loop {
394
        let status = get_status(&s.session)?;
5✔
395
        if !matches!(status, Some(WaitStatus::StillAlive)) {
10✔
396
            s.status = status;
2✔
397
            return Ok(false);
2✔
398
        }
399

400
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
6✔
401
            let eof = n == 0;
3✔
402
            let buf = &buf[..n];
3✔
403
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
6✔
404

405
            let exit = run_action_output(s, &buf, eof)?;
6✔
406
            if eof || exit {
6✔
NEW
407
                return Ok(true);
×
408
            }
409

410
            spin_write(&mut s.output, &buf)?;
3✔
411
            spin_flush(&mut s.output)?;
6✔
412
        }
413

414
        // We dont't print user input back to the screen.
415
        // In terminal mode it will be ECHOed back automatically.
416
        // This way we preserve terminal seetings for example when user inputs password.
417
        // The terminal must have been prepared before.
418
        match s.input.read(&mut buf) {
3✔
419
            Ok(n) => {
3✔
420
                let eof = n == 0;
3✔
421
                let buf = &buf[..n];
6✔
422
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
3✔
423

424
                #[rustfmt::skip]
425
                let exit = run_action_input(s, &buf, eof)?;
6✔
426
                if eof | exit {
3✔
427
                    return Ok(true);
3✔
428
                }
429

430
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
12✔
431
                match escape_char_position {
3✔
432
                    Some(pos) => {
1✔
433
                        s.session.write_all(&buf[..pos])?;
2✔
434
                        return Ok(true);
1✔
435
                    }
436
                    None => {
437
                        s.session.write_all(&buf[..])?;
11✔
438
                    }
439
                }
440
            }
441
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
9✔
442
            Err(err) => return Err(err.into()),
×
443
        }
444

445
        let exit = run_action_idle(s, &[], false)?;
6✔
446
        if exit {
3✔
NEW
447
            return Ok(true);
×
448
        }
449
    }
450
}
451

452
#[cfg(all(windows, not(feature = "async"), not(feature = "polling")))]
453
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
454
where
455
    S: Healthcheck + NonBlocking + Write + Read,
456
    O: Write,
457
    I: Read,
458
{
459
    let mut buf = [0; 512];
460

461
    loop {
462
        if !s.session.is_alive()? {
463
            return Ok(false);
464
        }
465

466
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
467
            let eof = n == 0;
468
            let buf = &buf[..n];
469
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
470

471
            let exit = run_action_output(s, &buf, eof)?;
472
            if eof || exit {
473
                return Ok(true);
474
            }
475

476
            spin_write(&mut s.output, &buf)?;
477
            spin_flush(&mut s.output)?;
478
        }
479

480
        // We dont't print user input back to the screen.
481
        // In terminal mode it will be ECHOed back automatically.
482
        // This way we preserve terminal seetings for example when user inputs password.
483
        // The terminal must have been prepared before.
484
        match s.input.read(&mut buf) {
485
            Ok(n) => {
486
                let eof = n == 0;
487
                let buf = &buf[..n];
488
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
489

490
                let exit = run_action_input(s, &buf, eof)?;
491
                if eof | exit {
492
                    return Ok(true);
493
                }
494

495
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
496
                match escape_char_position {
497
                    Some(pos) => {
498
                        s.session.write_all(&buf[..pos])?;
499
                        return Ok(true);
500
                    }
501
                    None => {
502
                        s.session.write_all(&buf[..])?;
503
                    }
504
                }
505
            }
506
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
507
            Err(err) => return Err(err.into()),
508
        }
509

510
        let exit = run_action_idle(s, &[], false)?;
511
        if exit {
512
            return Ok(true);
513
        }
514
    }
515
}
516

517
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
518
fn interact_polling<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
519
where
520
    S: Healthcheck<Status = WaitStatus> + Write + Read + std::os::unix::io::AsRawFd,
521
    I: Read + std::os::unix::io::AsRawFd,
522
    O: Write,
523
{
524
    use polling::{Event, Poller};
525

526
    // Create a poller and register interest in readability on the socket.
527
    let poller = Poller::new()?;
528
    poller.add(s.input.as_raw_fd(), Event::readable(0))?;
529
    poller.add(s.session.as_raw_fd(), Event::readable(1))?;
530

531
    let mut buf = [0; 512];
532

533
    // The event loop.
534
    let mut events = Vec::new();
535
    loop {
536
        let status = get_status(&s.session)?;
537
        if !matches!(status, Some(WaitStatus::StillAlive)) {
538
            s.status = status;
539
            return Ok(false);
540
        }
541

542
        // Wait for at least one I/O event.
543
        events.clear();
544
        let _ = poller.wait(&mut events, Some(std::time::Duration::from_secs(5)))?;
545

546
        for ev in &events {
547
            if ev.key == 0 {
548
                // We dont't print user input back to the screen.
549
                // In terminal mode it will be ECHOed back automatically.
550
                // This way we preserve terminal seetings for example when user inputs password.
551
                // The terminal must have been prepared before.
552
                match s.input.read(&mut buf) {
553
                    Ok(n) => {
554
                        let eof = n == 0;
555
                        let buf = &buf[..n];
556
                        let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
557

558
                        let exit = run_action_input(s, &buf, eof)?;
559
                        if eof || exit {
560
                            return Ok(true);
561
                        }
562

563
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
564
                        match escape_char_pos {
565
                            Some(pos) => {
566
                                s.session.write_all(&buf[..pos]).map_err(Error::IO)?;
567
                                return Ok(true);
568
                            }
569
                            None => s.session.write_all(&buf[..])?,
570
                        }
571
                    }
572
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
573
                    Err(err) => return Err(err.into()),
574
                }
575

576
                // Set interest in the next readability event.
577
                poller.modify(s.input.as_raw_fd(), Event::readable(0))?;
578
            }
579

580
            if ev.key == 1 {
581
                match s.session.read(&mut buf) {
582
                    Ok(n) => {
583
                        let eof = n == 0;
584
                        let buf = &buf[..n];
585
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
586

587
                        let exit = run_action_output(s, &buf, eof)?;
588

589
                        if eof || exit {
590
                            return Ok(true);
591
                        }
592

593
                        spin_write(&mut s.output, &buf)?;
594
                        spin_flush(&mut s.output)?;
595
                    }
596
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
597
                    Err(err) => return Err(err.into()),
598
                }
599

600
                // Set interest in the next readability event.
601
                poller.modify(s.session.as_raw_fd(), Event::readable(1))?;
602
            }
603
        }
604

605
        let exit = run_action_idle(s, &[], false)?;
606
        if exit {
607
            return Ok(true);
608
        }
609
    }
610
}
611

612
#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
613
fn interact_polling_on_thread<I, O, C>(
614
    s: &mut InteractSession<crate::session::OsSession, I, O, C>,
615
) -> Result<bool, Error>
616
where
617
    I: Read + Clone + Send + 'static,
618
    O: Write,
619
{
620
    use crate::{
621
        error::to_io_error,
622
        waiter::{Recv, Wait2},
623
    };
624

625
    // Create a poller and register interest in readability on the socket.
626
    let stream = s
627
        .session
628
        .get_stream()
629
        .try_clone()
630
        .map_err(to_io_error(""))?;
631
    let mut poller = Wait2::new(s.input.clone(), stream);
632

633
    loop {
634
        // In case where proceses exits we are trying to
635
        // fill buffer to run callbacks if there was something in.
636
        //
637
        // We ignore errors because there might be errors like EOCHILD etc.
638
        if s.session.is_alive()? {
639
            return Ok(false);
640
        }
641

642
        // Wait for at least one I/O event.
643
        let event = poller.recv().map_err(to_io_error(""))?;
644
        match event {
645
            Recv::R1(b) => match b {
646
                Ok(b) => {
647
                    let buf = b.map_or([0], |b| [b]);
648
                    let eof = b.is_none();
649
                    let n = if eof { 0 } else { 1 };
650
                    let buf = &buf[..n];
651

652
                    let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
653

654
                    let exit = run_action_input(s, &buf, eof)?;
655
                    if eof || exit {
656
                        return Ok(true);
657
                    }
658

659
                    // todo: replace all of these by 1 by 1 write
660
                    let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
661
                    match escape_char_pos {
662
                        Some(pos) => {
663
                            s.session.write_all(&buf[..pos])?;
664
                            return Ok(true);
665
                        }
666
                        None => s.session.write_all(&buf[..])?,
667
                    }
668
                }
669
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
670
                Err(err) => return Err(err.into()),
671
            },
672
            Recv::R2(b) => match b {
673
                Ok(b) => {
674
                    let buf = b.map_or([0], |b| [b]);
675
                    let eof = b.is_none();
676
                    let n = if eof { 0 } else { 1 };
677
                    let buf = &buf[..n];
678

679
                    let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
680

681
                    let exit = run_action_output(s, &buf, eof)?;
682
                    if eof || exit {
683
                        return Ok(true);
684
                    }
685

686
                    s.output.write_all(&buf)?;
687
                    s.output.flush()?;
688
                }
689
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
690
                Err(err) => return Err(err.into()),
691
            },
692
            Recv::Timeout => {
693
                let exit = run_action_idle(s, &[], false)?;
694
                if exit {
695
                    return Ok(true);
696
                }
697
            }
698
        }
699
    }
700
}
701

702
#[cfg(all(unix, feature = "async"))]
703
async fn interact_async<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
704
where
705
    S: Healthcheck<Status = WaitStatus> + AsyncRead + AsyncWrite + Unpin,
706
    I: AsyncRead + Unpin,
707
    O: AsyncWrite + Unpin,
708
{
709
    #[derive(Debug)]
710
    enum ReadFrom {
711
        Input,
712
        Proc,
713
        Timeout,
714
    }
715

716
    const TIMEOUT: Duration = Duration::from_secs(5);
717
    let mut input_buf = [0; 512];
718
    let mut proc_buf = [0; 512];
719

720
    loop {
721
        let status = get_status(&s.session)?;
722
        if !matches!(status, Some(WaitStatus::StillAlive)) {
723
            s.status = status;
724
            return Ok(false);
725
        }
726

727
        let read_process = async { (ReadFrom::Proc, s.session.read(&mut proc_buf).await) };
728
        let read_input = async { (ReadFrom::Input, s.input.read(&mut input_buf).await) };
729
        let timeout = async { (ReadFrom::Timeout, async_timeout(TIMEOUT).await) };
730

731
        let read_any = future::or(read_process, read_input);
732
        let read_output = future::or(read_any, timeout).await;
733
        let read_target = read_output.0;
734
        let read_result = read_output.1;
735

736
        match read_target {
737
            ReadFrom::Proc => {
738
                let n = read_result?;
739
                let eof = n == 0;
740
                let buf = &proc_buf[..n];
741
                let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
742

743
                let exit = run_action_output(s, &buf, eof)?;
744

745
                if eof || exit {
746
                    return Ok(true);
747
                }
748

749
                s.output.write(&buf).await?;
750
                s.output.flush().await?;
751
            }
752
            ReadFrom::Input => {
753
                // We dont't print user input back to the screen.
754
                // In terminal mode it will be ECHOed back automatically.
755
                // This way we preserve terminal seetings for example when user inputs password.
756
                // The terminal must have been prepared before.
757
                match read_result {
758
                    Ok(n) => {
759
                        let eof = n == 0;
760
                        let buf = &input_buf[..n];
761
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
762

763
                        let exit = run_action_input(s, &buf, eof)?;
764

765
                        if eof || exit {
766
                            return Ok(true);
767
                        }
768

769
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
770
                        match escape_char_pos {
771
                            Some(pos) => {
772
                                s.session.write_all(&buf[..pos]).await?;
773
                                return Ok(true);
774
                            }
775
                            None => s.session.write_all(&buf[..]).await?,
776
                        }
777
                    }
778
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
779
                    Err(err) => return Err(err.into()),
780
                }
781
            }
782
            ReadFrom::Timeout => {
783
                let exit = run_action_idle(s, &[], false)?;
784
                if exit {
785
                    return Ok(true);
786
                }
787
            }
788
        }
789
    }
790
}
791

792
#[cfg(all(windows, feature = "async"))]
793
async fn interact_async<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
794
where
795
    S: Healthcheck + AsyncRead + AsyncWrite + Unpin,
796
    I: AsyncRead + Unpin,
797
    O: AsyncWrite + Unpin,
798
{
799
    #[derive(Debug)]
800
    enum ReadFrom {
801
        Input,
802
        Proc,
803
        Timeout,
804
    }
805

806
    const TIMEOUT: Duration = Duration::from_secs(5);
807
    let mut input_buf = [0; 512];
808
    let mut proc_buf = [0; 512];
809

810
    loop {
811
        if !s.session.is_alive()? {
812
            return Ok(false);
813
        }
814

815
        let read_process = async { (ReadFrom::Proc, s.session.read(&mut proc_buf).await) };
816
        let read_input = async { (ReadFrom::Input, s.input.read(&mut input_buf).await) };
817
        let timeout = async { (ReadFrom::Timeout, async_timeout(TIMEOUT).await) };
818

819
        let read_any = future::or(read_process, read_input);
820
        let read_output = future::or(read_any, timeout).await;
821
        let read_target = read_output.0;
822
        let read_result = read_output.1;
823

824
        match read_target {
825
            ReadFrom::Proc => {
826
                let n = read_result?;
827
                let eof = n == 0;
828
                let buf = &proc_buf[..n];
829
                let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
830

831
                let exit = run_action_output(s, &buf, eof)?;
832

833
                if eof || exit {
834
                    return Ok(true);
835
                }
836

837
                s.output.write(&buf).await?;
838
                s.output.flush().await?;
839
            }
840
            ReadFrom::Input => {
841
                // We dont't print user input back to the screen.
842
                // In terminal mode it will be ECHOed back automatically.
843
                // This way we preserve terminal seetings for example when user inputs password.
844
                // The terminal must have been prepared before.
845
                match read_result {
846
                    Ok(n) => {
847
                        let eof = n == 0;
848
                        let buf = &input_buf[..n];
849
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
850

851
                        let exit = run_action_input(s, &buf, eof)?;
852

853
                        if eof || exit {
854
                            return Ok(true);
855
                        }
856

857
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
858
                        match escape_char_pos {
859
                            Some(pos) => {
860
                                s.session.write_all(&buf[..pos]).await?;
861
                                return Ok(true);
862
                            }
863
                            None => s.session.write_all(&buf[..]).await?,
864
                        }
865
                    }
866
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
867
                    Err(err) => return Err(err.into()),
868
                }
869
            }
870
            ReadFrom::Timeout => {
871
                let exit = run_action_idle(s, &[], false)?;
872
                if exit {
873
                    return Ok(true);
874
                }
875
            }
876
        }
877
    }
878
}
879

880
#[cfg(feature = "async")]
881
async fn async_timeout(timeout: Duration) -> io::Result<usize> {
882
    Delay::new(timeout).await;
883
    io::Result::Ok(0)
884
}
885

886
fn spin_write<W>(mut writer: W, buf: &[u8]) -> std::io::Result<()>
1✔
887
where
888
    W: Write,
889
{
890
    loop {
891
        match writer.write_all(buf) {
2✔
892
            Ok(_) => return Ok(()),
1✔
893
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
894
            Err(_) => (),
895
        }
896
    }
897
}
898

899
fn spin_flush<W>(mut writer: W) -> std::io::Result<()>
1✔
900
where
901
    W: Write,
902
{
903
    loop {
904
        match writer.flush() {
2✔
905
            Ok(_) => return Ok(()),
1✔
906
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
907
            Err(_) => (),
908
        }
909
    }
910
}
911

912
#[rustfmt::skip]
913
fn run_action_input<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
3✔
914
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, buf, eof);
3✔
915
    opt_action(ctx, &mut s.opts.input_action)
3✔
916
}
917

918
#[rustfmt::skip]
919
fn run_action_output<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
3✔
920
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, buf, eof);
3✔
921
    opt_action(ctx, &mut s.opts.output_action)
3✔
922
}
923

924
#[rustfmt::skip]
925
fn run_action_idle<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
3✔
926
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, buf, eof);
3✔
927
    opt_action(ctx, &mut s.opts.idle_action)
3✔
928
}
929

930
fn opt_action<S, I, O, C>(
3✔
931
    ctx: Context<'_, S, I, O, C>,
932
    opt: &mut Option<OptAction<S, I, O, C>>,
933
) -> ExpectResult<bool> {
934
    match opt {
3✔
935
        Some(action) => (action)(ctx),
2✔
936
        None => Ok(false),
937
    }
938
}
939

940
fn call_filter<F>(filter: Option<F>, buf: &[u8]) -> Result<Cow<'_, [u8]>, Error>
1✔
941
where
942
    F: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
943
{
944
    match filter {
1✔
945
        Some(mut action) => (action)(buf),
2✔
946
        None => Ok(Cow::Borrowed(buf)),
1✔
947
    }
948
}
949

950
#[cfg(unix)]
951
fn get_status<S>(session: &S) -> Result<Option<S::Status>, Error>
2✔
952
where
953
    S: Healthcheck,
954
{
955
    match session.get_status() {
2✔
956
        Ok(status) => Ok(Some(status)),
2✔
957
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
3✔
NEW
958
        Err(err) => Err(Error::IO(err)),
×
959
    }
960
}
961

962
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
963
fn try_read<S>(session: &mut S, buf: &mut [u8]) -> ExpectResult<Option<usize>>
1✔
964
where
965
    S: NonBlocking + Read,
966
{
967
    session.set_blocking(false)?;
1✔
968

969
    let result = session.read(buf);
1✔
970

971
    session.set_blocking(true)?;
3✔
972

973
    match result {
1✔
974
        Ok(n) => Ok(Some(n)),
1✔
975
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
3✔
NEW
976
        Err(err) => Err(Error::IO(err)),
×
977
    }
978
}
979

980
fn get_pointer<T>(ptr: &Option<Box<T>>) -> usize
981
where
982
    T: ?Sized,
983
{
984
    ptr.as_ref().map_or(0, |f| std::ptr::addr_of!(f) as usize)
985
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc