• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8126127478

02 Mar 2024 11:30PM UTC coverage: 47.529% (-9.0%) from 56.575%
8126127478

Pull #68

github

web-flow
Merge e7c63072c into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

148 of 344 new or added lines in 14 files covered. (43.02%)

728 existing lines in 15 files now uncovered.

1491 of 3137 relevant lines covered (47.53%)

3.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.04
/src/interact/session.rs
1
//! This module contains a [`InteractSession`] which runs an interact session with IO.
2

3
// todo: PtyProcess wait_echo optimize by not looping when timout is none
4

5
use std::{
6
    borrow::Cow,
7
    io::{ErrorKind, Write},
8
};
9

10
#[cfg(not(feature = "async"))]
11
use std::io::Read;
12

13
#[cfg(feature = "async")]
14
use std::{io, time::Duration};
15

16
#[cfg(feature = "async")]
17
use futures_timer::Delay;
18

19
#[cfg(feature = "async")]
20
use futures_lite::{
21
    future,
22
    io::{AsyncRead, AsyncWrite},
23
    AsyncReadExt, AsyncWriteExt,
24
};
25

26
use crate::{
27
    process::{Healthcheck, Termios},
28
    Error,
29
};
30

31
#[cfg(not(feature = "async"))]
32
use crate::Expect;
33

34
#[cfg(feature = "async")]
35
use crate::AsyncExpect;
36

37
use crate::interact::Context;
38
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
39
use crate::process::NonBlocking;
40

41
#[cfg(unix)]
42
use crate::process::unix::WaitStatus;
43

44
type ExpectResult<T> = Result<T, Error>;
45

46
/// InteractConfig represents options of an interactive session.
47
pub struct InteractSession<Session, Input, Output, State> {
48
    session: Session,
49
    input: Input,
50
    output: Output,
51
    escape_character: u8,
52
    #[cfg(unix)]
53
    status: Option<WaitStatus>,
54
    opts: InteractOptions<Session, Input, Output, State>,
55
}
56

57
/// Interact options (aka callbacks you can set to be callled being in an interactive mode).
58
struct InteractOptions<S, I, O, C> {
59
    state: C,
60
    input_filter: Option<OptFilter>,
61
    output_filter: Option<OptFilter>,
62
    input_action: Option<OptAction<S, I, O, C>>,
63
    output_action: Option<OptAction<S, I, O, C>>,
64
    idle_action: Option<OptAction<S, I, O, C>>,
65
}
66

67
type OptAction<S, I, O, C> = Box<dyn FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool>>;
68

69
type OptFilter = Box<dyn FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>>>;
70

71
impl<S, I, O, C> InteractSession<S, I, O, C> {
72
    /// Default escape character. <Ctrl-\]>
73
    pub const ESCAPE: u8 = 29;
74

75
    /// Creates a new object of [`InteractSession`].
76
    pub fn new(session: S, input: I, output: O, state: C) -> InteractSession<S, I, O, C> {
3✔
77
        InteractSession {
78
            input,
79
            output,
80
            session,
81
            escape_character: Self::ESCAPE,
82
            opts: InteractOptions {
3✔
83
                state,
84
                input_filter: None,
85
                output_filter: None,
86
                input_action: None,
87
                output_action: None,
88
                idle_action: None,
89
            },
90
            #[cfg(unix)]
91
            status: None,
92
        }
93
    }
94

95
    /// Sets an escape character after seen which the interact interactions will be stopped
96
    /// and controll will be returned to a caller process.
97
    pub fn set_escape_character(mut self, c: u8) -> Self {
×
98
        self.escape_character = c;
×
99
        self
×
100
    }
101

102
    /// Returns a status of spawned session if it was exited.
103
    ///
104
    /// If [`Self::spawn`] returns false but this method returns None it means that a child process was shutdown by various reasons.
105
    /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned.
106
    ///
107
    /// [`Self::spawn`]: crate::interact::InteractSession::spawn
108
    #[cfg(unix)]
NEW
109
    pub fn get_status(&self) -> Option<WaitStatus> {
×
110
        self.status
×
111
    }
112
}
113

114
impl<S, I, O, C> InteractSession<S, I, O, C> {
115
    /// Set a state
NEW
116
    pub fn with_state<State>(self, state: State) -> InteractSession<S, I, O, State> {
×
NEW
117
        let mut s = InteractSession::new(self.session, self.input, self.output, state);
×
NEW
118
        s.escape_character = self.escape_character;
×
119
        #[cfg(unix)]
120
        {
NEW
121
            s.status = self.status;
×
122
        }
123

NEW
124
        s
×
125
    }
126

127
    /// Get a reference on state
NEW
128
    pub fn get_state(&self) -> &C {
×
NEW
129
        &self.opts.state
×
130
    }
131

132
    /// Get a mut reference on state
NEW
133
    pub fn get_state_mut(&mut self) -> &mut C {
×
NEW
134
        &mut self.opts.state
×
135
    }
136

137
    /// Returns a inner state.
138
    pub fn into_state(self) -> C {
1✔
139
        self.opts.state
1✔
140
    }
141

142
    /// Sets the output filter.
143
    /// The output_filter will be passed all the output from the child process.
144
    ///
145
    /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks.
146
    pub fn set_output_filter<F>(&mut self, filter: F) -> &mut Self
147
    where
148
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
149
    {
NEW
150
        self.opts.output_filter = Some(Box::new(filter));
×
NEW
151
        self
×
152
    }
153

154
    /// Sets the input filter.
155
    /// The input_filter will be passed all the keyboard input from the user.
156
    ///
157
    /// The input_filter is run BEFORE the check for the escape_character.
158
    /// The filter is called BEFORE calling a on_input callback if it's set.
159
    pub fn set_input_filter<F>(&mut self, filter: F) -> &mut Self
160
    where
161
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
162
    {
NEW
163
        self.opts.input_filter = Some(Box::new(filter));
×
NEW
164
        self
×
165
    }
166

167
    /// Puts a hanlder which will be called when users input is detected.
168
    ///
169
    /// Be aware that currently async version doesn't take a Session as an argument.
170
    /// See <https://github.com/zhiburt/expectrl/issues/16>.
171
    pub fn set_input_action<F>(&mut self, action: F) -> &mut Self
172
    where
173
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
174
    {
NEW
175
        self.opts.input_action = Some(Box::new(action));
×
NEW
176
        self
×
177
    }
178

179
    /// Puts a hanlder which will be called when process output is detected.
180
    ///
181
    /// IMPORTANT:
182
    ///
183
    /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session
184
    /// will cause the read bytes not to apeard in the output stream!
185
    pub fn set_output_action<F>(&mut self, action: F) -> &mut Self
1✔
186
    where
187
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
188
    {
189
        self.opts.output_action = Some(Box::new(action));
2✔
NEW
190
        self
×
191
    }
192

193
    /// Puts a handler which will be called on each interaction when no input is detected.
194
    pub fn set_idle_action<F>(&mut self, action: F) -> &mut Self
195
    where
196
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
197
    {
NEW
198
        self.opts.idle_action = Some(Box::new(action));
×
NEW
199
        self
×
200
    }
201
}
202

203
#[cfg(not(any(feature = "async", feature = "polling")))]
204
impl<S, I, O, C> InteractSession<S, I, O, C>
205
where
206
    I: Read,
207
    O: Write,
208
    S: Expect + Termios + Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
209
{
210
    /// Runs the session.
211
    ///
212
    /// See [`Session::interact`].
213
    ///
214
    /// [`Session::interact`]: crate::session::Session::interact
NEW
215
    pub fn spawn(&mut self) -> ExpectResult<bool> {
×
216
        #[cfg(unix)]
217
        {
NEW
218
            let is_echo = self.session.is_echo()?;
×
219
            if !is_echo {
×
NEW
220
                let _ = self.session.set_echo(true);
×
221
            }
222

UNCOV
223
            self.status = None;
×
NEW
224
            let is_alive = interact_buzy_loop(self)?;
×
225

UNCOV
226
            if !is_echo {
×
NEW
227
                let _ = self.session.set_echo(false);
×
228
            }
229

UNCOV
230
            Ok(is_alive)
×
231
        }
232

233
        #[cfg(windows)]
234
        {
NEW
235
            interact_buzy_loop(self)
×
236
        }
237
    }
238
}
239

240
#[cfg(all(unix, feature = "polling", not(feature = "async")))]
241
impl<S, I, O> InteractSession<&mut Session<OsProcess, S>, I, O>
242
where
243
    I: Read + std::os::unix::io::AsRawFd,
244
    O: Write,
245
    S: Write + Read + std::os::unix::io::AsRawFd,
246
{
247
    /// Runs the session.
248
    ///
249
    /// See [`Session::interact`].
250
    ///
251
    /// [`Session::interact`]: crate::session::Session::interact
252
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
253
    where
254
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
255
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
256
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
257
        IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
258
        OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
259
        WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
260
    {
261
        let is_echo = self
×
262
            .session
×
263
            .get_process()
264
            .get_echo()
265
            .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
266
        if !is_echo {
×
267
            let _ = self.session.get_process_mut().set_echo(true, None);
×
268
        }
269

270
        self.status = None;
×
271
        let is_alive = interact_polling(self, ops.borrow_mut())?;
×
272

273
        if !is_echo {
×
274
            let _ = self.session.get_process_mut().set_echo(false, None);
×
275
        }
276

277
        Ok(is_alive)
×
278
    }
279
}
280

281
#[cfg(feature = "async")]
282
impl<S, I, O, C> InteractSession<S, I, O, C>
283
where
284
    I: AsyncRead + Unpin,
285
    O: AsyncWrite + Unpin,
286
    S: AsyncExpect + Termios + Healthcheck<Status = WaitStatus> + AsyncWrite + AsyncRead + Unpin,
287
{
288
    /// Runs the session.
289
    ///
290
    /// See [`Session::interact`].
291
    ///
292
    /// [`Session::interact`]: crate::session::Session::interact
293
    pub async fn spawn(&mut self) -> Result<bool, Error> {
18✔
294
        #[cfg(unix)]
295
        {
296
            let is_echo = self.session.is_echo().map_err(Error::IO)?;
6✔
297
            if !is_echo {
3✔
298
                let _ = self.session.set_echo(true);
6✔
299
            }
300

301
            let is_alive = interact_async(self).await?;
6✔
302

303
            if !is_echo {
3✔
304
                let _ = self.session.set_echo(false);
6✔
305
            }
306

307
            Ok(is_alive)
3✔
308
        }
309

310
        #[cfg(windows)]
311
        {
312
            interact_async(self, opts.borrow_mut()).await
×
313
        }
314
    }
315
}
316

317
#[cfg(all(windows, feature = "polling", not(feature = "async")))]
318
impl<I, O> InteractSession<&mut Session, I, O>
319
where
320
    I: Read + Clone + Send + 'static,
321
    O: Write,
322
{
323
    /// Runs the session.
324
    ///
325
    /// See [`Session::interact`].
326
    ///
327
    /// [`Session::interact`]: crate::session::Session::interact
328
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
329
    where
330
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
331
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
332
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
333
        IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
334
        OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
335
        WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
336
    {
337
        interact_polling_on_thread(self, ops.borrow_mut())
×
338
    }
339
}
340

341
impl<S, I, O, C> std::fmt::Debug for InteractSession<S, I, O, C>
342
where
343
    S: std::fmt::Debug,
344
    I: std::fmt::Debug,
345
    O: std::fmt::Debug,
346
    C: std::fmt::Debug,
347
{
348
    #[rustfmt::skip]
NEW
349
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
350
        f.debug_struct("InteractSession")
×
NEW
351
            .field("session", &self.session)
×
NEW
352
            .field("input", &self.input)
×
NEW
353
            .field("output", &self.output)
×
NEW
354
            .field("escape_character", &self.escape_character)
×
NEW
355
            .field("status", &self.status)
×
NEW
356
            .field("state", &std::ptr::addr_of!(self.opts.state))
×
NEW
357
            .field("opts:on_idle", &get_pointer(&self.opts.idle_action))
×
NEW
358
            .field("opts:on_input", &get_pointer(&self.opts.input_action))
×
NEW
359
            .field("opts:on_output", &get_pointer(&self.opts.output_action))
×
NEW
360
            .field("opts:input_filter", &get_pointer(&self.opts.input_filter))
×
NEW
361
            .field("opts:output_filter", &get_pointer(&self.opts.output_filter))
×
362
            .finish()
363
    }
364
}
365

366
#[cfg(all(unix, not(feature = "async"), not(feature = "polling")))]
367
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
368
where
369
    S: Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
370
    O: Write,
371
    I: Read,
372
{
373
    let mut buf = [0; 512];
374

375
    loop {
376
        let status = get_status(&s.session)?;
377
        if !matches!(status, Some(WaitStatus::StillAlive)) {
378
            s.status = status;
379
            return Ok(false);
380
        }
381

382
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
383
            let eof = n == 0;
384
            let buf = &buf[..n];
385
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
386

387
            #[rustfmt::skip]
388
            let exit = opt_action(
389
                Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof),
390
                &mut s.opts.output_action,
391
            )?;
392
            if eof || exit {
393
                return Ok(true);
394
            }
395

396
            spin_write(&mut s.output, &buf)?;
397
            spin_flush(&mut s.output)?;
398
        }
399

400
        // We dont't print user input back to the screen.
401
        // In terminal mode it will be ECHOed back automatically.
402
        // This way we preserve terminal seetings for example when user inputs password.
403
        // The terminal must have been prepared before.
404
        match s.input.read(&mut buf) {
405
            Ok(n) => {
406
                let eof = n == 0;
407
                let buf = &buf[..n];
408
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
409

410
                #[rustfmt::skip]
411
                let exit = opt_action(
412
                    Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof),
413
                    &mut s.opts.input_action,
414
                )?;
415
                if eof | exit {
416
                    return Ok(true);
417
                }
418

419
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
420
                match escape_char_position {
421
                    Some(pos) => {
422
                        s.session.write_all(&buf[..pos])?;
423
                        return Ok(true);
424
                    }
425
                    None => {
426
                        s.session.write_all(&buf[..])?;
427
                    }
428
                }
429
            }
430
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
431
            Err(err) => return Err(err.into()),
432
        }
433

434
        #[rustfmt::skip]
435
        let exit = opt_action(
436
            Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, false),
437
            &mut s.opts.idle_action,
438
        )?;
439
        if exit {
440
            return Ok(true);
441
        }
442
    }
443
}
444

445
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
446
fn interact_polling<S, O, I, C, IF, OF, IA, OA, WA>(
447
    interact: &mut InteractSession<&mut Session<OsProcess, S>, I, O>,
448
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
449
) -> Result<bool, Error>
450
where
451
    S: Write + Read + std::os::unix::io::AsRawFd,
452
    I: Read + std::os::unix::io::AsRawFd,
453
    O: Write,
454
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
455
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
456
    IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
457
    OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
458
    WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
459
{
460
    use polling::{Event, Poller};
461

462
    // Create a poller and register interest in readability on the socket.
463
    let poller = Poller::new()?;
464
    poller.add(interact.input.as_raw_fd(), Event::readable(0))?;
465
    poller.add(
466
        interact.session.get_stream().as_raw_fd(),
467
        Event::readable(1),
468
    )?;
469

470
    let mut buf = [0; 512];
471

472
    // The event loop.
473
    let mut events = Vec::new();
474
    loop {
475
        let status = get_status(interact.session)?;
476
        if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
477
            interact.status = status;
478
            return Ok(false);
479
        }
480

481
        // Wait for at least one I/O event.
482
        events.clear();
483
        let _ = poller.wait(&mut events, Some(std::time::Duration::from_secs(5)))?;
484

485
        for ev in &events {
486
            if ev.key == 0 {
487
                // We dont't print user input back to the screen.
488
                // In terminal mode it will be ECHOed back automatically.
489
                // This way we preserve terminal seetings for example when user inputs password.
490
                // The terminal must have been prepared before.
491
                match interact.input.read(&mut buf) {
492
                    Ok(n) => {
493
                        let eof = n == 0;
494
                        let buf = &buf[..n];
495
                        let buf = call_filter(opts.input_filter.as_mut(), buf)?;
496

497
                        let exit = call_action(
498
                            opts.input_action.as_mut(),
499
                            interact.session,
500
                            &mut interact.input,
501
                            &mut interact.output,
502
                            &mut opts.state,
503
                            &buf,
504
                            eof,
505
                        )?;
506

507
                        if eof || exit {
508
                            return Ok(true);
509
                        }
510

511
                        let escape_char_pos =
512
                            buf.iter().position(|c| *c == interact.escape_character);
513
                        match escape_char_pos {
514
                            Some(pos) => {
515
                                interact.session.write_all(&buf[..pos]).map_err(Error::IO)?;
516
                                return Ok(true);
517
                            }
518
                            None => interact.session.write_all(&buf[..])?,
519
                        }
520
                    }
521
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
522
                    Err(err) => return Err(err.into()),
523
                }
524

525
                // Set interest in the next readability event.
526
                poller.modify(interact.input.as_raw_fd(), Event::readable(0))?;
527
            }
528

529
            if ev.key == 1 {
530
                match interact.session.read(&mut buf) {
531
                    Ok(n) => {
532
                        let eof = n == 0;
533
                        let buf = &buf[..n];
534
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
535

536
                        let exit = call_action(
537
                            opts.output_action.as_mut(),
538
                            interact.session,
539
                            &mut interact.input,
540
                            &mut interact.output,
541
                            &mut opts.state,
542
                            &buf,
543
                            eof,
544
                        )?;
545

546
                        if eof || exit {
547
                            return Ok(true);
548
                        }
549

550
                        spin_write(&mut interact.output, &buf)?;
551
                        spin_flush(&mut interact.output)?;
552
                    }
553
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
554
                    Err(err) => return Err(err.into()),
555
                }
556

557
                // Set interest in the next readability event.
558
                poller.modify(
559
                    interact.session.get_stream().as_raw_fd(),
560
                    Event::readable(1),
561
                )?;
562
            }
563
        }
564

565
        let exit = call_action(
566
            opts.idle_action.as_mut(),
567
            interact.session,
568
            &mut interact.input,
569
            &mut interact.output,
570
            &mut opts.state,
571
            &[],
572
            false,
573
        )?;
574

575
        if exit {
576
            return Ok(true);
577
        }
578
    }
579
}
580

581
#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
582
fn interact_polling_on_thread<O, I, C, IF, OF, IA, OA, WA>(
583
    interact: &mut InteractSession<&mut Session, I, O>,
584
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
585
) -> Result<bool, Error>
586
where
587
    I: Read + Clone + Send + 'static,
588
    O: Write,
589
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
590
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
591
    IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
592
    OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
593
    WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
594
{
595
    use crate::{
596
        error::to_io_error,
597
        waiter::{Recv, Wait2},
598
    };
599

600
    // Create a poller and register interest in readability on the socket.
601
    let stream = interact
602
        .session
603
        .get_stream()
604
        .try_clone()
605
        .map_err(to_io_error(""))?;
606
    let mut poller = Wait2::new(interact.input.clone(), stream);
607

608
    loop {
609
        // In case where proceses exits we are trying to
610
        // fill buffer to run callbacks if there was something in.
611
        //
612
        // We ignore errors because there might be errors like EOCHILD etc.
613
        if interact.session.is_alive()? {
614
            return Ok(false);
615
        }
616

617
        // Wait for at least one I/O event.
618
        let event = poller.recv().map_err(to_io_error(""))?;
619
        match event {
620
            Recv::R1(b) => match b {
621
                Ok(b) => {
622
                    let buf = b.map_or([0], |b| [b]);
623
                    let eof = b.is_none();
624
                    let n = if eof { 0 } else { 1 };
625
                    let buf = &buf[..n];
626

627
                    let buf = call_filter(opts.input_filter.as_mut(), buf)?;
628

629
                    let exit = call_action(
630
                        opts.input_action.as_mut(),
631
                        interact.session,
632
                        &mut interact.input,
633
                        &mut interact.output,
634
                        &mut opts.state,
635
                        &buf,
636
                        eof,
637
                    )?;
638

639
                    if eof || exit {
640
                        return Ok(true);
641
                    }
642

643
                    // todo: replace all of these by 1 by 1 write
644
                    let escape_char_pos = buf.iter().position(|c| *c == interact.escape_character);
645
                    match escape_char_pos {
646
                        Some(pos) => {
647
                            interact.session.write_all(&buf[..pos])?;
648
                            return Ok(true);
649
                        }
650
                        None => interact.session.write_all(&buf[..])?,
651
                    }
652
                }
653
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
654
                Err(err) => return Err(err.into()),
655
            },
656
            Recv::R2(b) => match b {
657
                Ok(b) => {
658
                    let buf = b.map_or([0], |b| [b]);
659
                    let eof = b.is_none();
660
                    let n = if eof { 0 } else { 1 };
661
                    let buf = &buf[..n];
662

663
                    let buf = call_filter(opts.output_filter.as_mut(), buf)?;
664

665
                    let exit = call_action(
666
                        opts.output_action.as_mut(),
667
                        interact.session,
668
                        &mut interact.input,
669
                        &mut interact.output,
670
                        &mut opts.state,
671
                        &buf,
672
                        eof,
673
                    )?;
674

675
                    if eof || exit {
676
                        return Ok(true);
677
                    }
678

679
                    interact.output.write_all(&buf)?;
680
                    interact.output.flush()?;
681
                }
682
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
683
                Err(err) => return Err(err.into()),
684
            },
685
            Recv::Timeout => {
686
                let exit = call_action(
687
                    opts.idle_action.as_mut(),
688
                    interact.session,
689
                    &mut interact.input,
690
                    &mut interact.output,
691
                    &mut opts.state,
692
                    &[],
693
                    false,
694
                )?;
695

696
                if exit {
697
                    return Ok(true);
698
                }
699
            }
700
        }
701
    }
702
}
703

704
#[cfg(all(unix, feature = "async"))]
705
async fn interact_async<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
6✔
706
where
707
    S: Healthcheck<Status = WaitStatus> + AsyncRead + AsyncWrite + Unpin,
708
    I: AsyncRead + Unpin,
709
    O: AsyncWrite + Unpin,
710
{
711
    #[derive(Debug)]
712
    enum ReadFrom {
713
        Input,
714
        Proc,
715
        Timeout,
716
    }
717

718
    const TIMEOUT: Duration = Duration::from_secs(5);
719
    let mut input_buf = [0; 512];
3✔
720
    let mut proc_buf = [0; 512];
3✔
721

722
    loop {
4✔
723
        let status = get_status(&s.session)?;
6✔
724
        if !matches!(status, Some(WaitStatus::StillAlive)) {
6✔
725
            s.status = status;
2✔
726
            return Ok(false);
2✔
727
        }
728

729
        let read_process = async { (ReadFrom::Proc, s.session.read(&mut proc_buf).await) };
3✔
730
        let read_input = async { (ReadFrom::Input, s.input.read(&mut input_buf).await) };
3✔
731
        let timeout = async { (ReadFrom::Timeout, async_timeout(TIMEOUT).await) };
1✔
732

733
        let read_any = future::or(read_process, read_input);
1✔
734
        let read_output = future::or(read_any, timeout).await;
2✔
735
        let read_target = read_output.0;
1✔
736
        let read_result = read_output.1;
1✔
737

738
        match read_target {
1✔
739
            ReadFrom::Proc => {
740
                let n = read_result?;
2✔
741
                let eof = n == 0;
1✔
742
                let buf = &proc_buf[..n];
2✔
743
                let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
1✔
744

745
                let exit = run_action_output(s, &buf, eof)?;
2✔
746

747
                if eof || exit {
2✔
UNCOV
748
                    return Ok(true);
×
749
                }
750

751
                s.output.write(&buf).await?;
2✔
752
                s.output.flush().await?;
3✔
753
            }
754
            ReadFrom::Input => {
755
                // We dont't print user input back to the screen.
756
                // In terminal mode it will be ECHOed back automatically.
757
                // This way we preserve terminal seetings for example when user inputs password.
758
                // The terminal must have been prepared before.
759
                match read_result {
1✔
760
                    Ok(n) => {
1✔
761
                        let eof = n == 0;
1✔
762
                        let buf = &input_buf[..n];
2✔
763
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
1✔
764

765
                        let exit = run_action_input(s, &buf, eof)?;
2✔
766

767
                        if eof || exit {
2✔
768
                            return Ok(true);
1✔
769
                        }
770

771
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
3✔
772
                        match escape_char_pos {
1✔
UNCOV
773
                            Some(pos) => {
×
NEW
774
                                s.session.write_all(&buf[..pos]).await?;
×
UNCOV
775
                                return Ok(true);
×
776
                            }
777
                            None => s.session.write_all(&buf[..]).await?,
4✔
778
                        }
779
                    }
780
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
3✔
UNCOV
781
                    Err(err) => return Err(err.into()),
×
782
                }
783
            }
784
            ReadFrom::Timeout => {
NEW
785
                let exit = run_action_idle(s, &[], false)?;
×
UNCOV
786
                if exit {
×
UNCOV
787
                    return Ok(true);
×
788
                }
789
            }
790
        }
791
    }
792
}
793

794
#[cfg(feature = "async")]
NEW
795
async fn async_timeout(timeout: Duration) -> io::Result<usize> {
×
NEW
796
    Delay::new(timeout).await;
×
797
    io::Result::Ok(0)
798
}
799

800
fn spin_write<W>(mut writer: W, buf: &[u8]) -> std::io::Result<()>
801
where
802
    W: Write,
803
{
804
    loop {
805
        match writer.write_all(buf) {
806
            Ok(_) => return Ok(()),
807
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
808
            Err(_) => (),
809
        }
810
    }
811
}
812

813
fn spin_flush<W>(mut writer: W) -> std::io::Result<()>
814
where
815
    W: Write,
816
{
817
    loop {
818
        match writer.flush() {
819
            Ok(_) => return Ok(()),
820
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
821
            Err(_) => (),
822
        }
823
    }
824
}
825

826
#[rustfmt::skip]
827
fn run_action_input<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
1✔
828
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
1✔
829
    opt_action(ctx, &mut s.opts.input_action)
1✔
830
}
831

832
#[rustfmt::skip]
833
fn run_action_output<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
1✔
834
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
1✔
835
    opt_action(ctx, &mut s.opts.output_action)
1✔
836
}
837

838
#[rustfmt::skip]
NEW
839
fn run_action_idle<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
×
NEW
840
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
×
NEW
841
    opt_action(ctx, &mut s.opts.idle_action)
×
842
}
843

844
fn opt_action<S, I, O, C>(
1✔
845
    ctx: Context<'_, S, I, O, C>,
846
    opt: &mut Option<OptAction<S, I, O, C>>,
847
) -> ExpectResult<bool> {
848
    match opt {
1✔
NEW
849
        Some(action) => (action)(ctx),
×
850
        None => Ok(false),
851
    }
852
}
853

854
fn call_filter<F>(filter: Option<F>, buf: &[u8]) -> Result<Cow<'_, [u8]>, Error>
1✔
855
where
856
    F: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
857
{
858
    match filter {
1✔
UNCOV
859
        Some(mut action) => (action)(buf),
×
860
        None => Ok(Cow::Borrowed(buf)),
1✔
861
    }
862
}
863

864
#[cfg(unix)]
865
fn get_status<S>(session: &S) -> Result<Option<S::Status>, Error>
2✔
866
where
867
    S: Healthcheck,
868
{
869
    match session.get_status() {
2✔
870
        Ok(status) => Ok(Some(status)),
2✔
NEW
871
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
×
NEW
872
        Err(err) => Err(Error::IO(err)),
×
873
    }
874
}
875

876
#[cfg(unix)]
877
#[cfg(not(feature = "async"))]
878
fn try_read<S>(session: &mut S, buf: &mut [u8]) -> ExpectResult<Option<usize>>
879
where
880
    S: NonBlocking + Read,
881
{
882
    session.set_blocking(false)?;
883

884
    let result = session.read(buf);
885

886
    session.set_blocking(true)?;
887

888
    match result {
889
        Ok(n) => Ok(Some(n)),
890
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
891
        Err(err) => Err(Error::IO(err)),
892
    }
893
}
894

895
fn get_pointer<T>(ptr: &Option<Box<T>>) -> usize
896
where
897
    T: ?Sized,
898
{
899
    ptr.as_ref().map_or(0, |f| std::ptr::addr_of!(f) as usize)
900
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc