• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8119237697

02 Mar 2024 01:11AM UTC coverage: 56.739% (+0.2%) from 56.575%
8119237697

Pull #68

github

web-flow
Merge d1e39b6e1 into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

226 of 300 new or added lines in 12 files covered. (75.33%)

141 existing lines in 5 files now uncovered.

1486 of 2619 relevant lines covered (56.74%)

3.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.56
/src/interact/session.rs
1
//! This module contains a [`InteractSession`] which runs an interact session with IO.
2

3
// todo: PtyProcess wait_echo optimize by not looping when timout is none
4

5
use std::{
6
    borrow::Cow,
7
    io::{ErrorKind, Write},
8
};
9

10
#[cfg(not(feature = "async"))]
11
use std::io::Read;
12

13
use crate::{
14
    process::{Healthcheck, Termios},
15
    Error, Expect,
16
};
17

18
use crate::interact::Context;
19
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
20
use crate::process::NonBlocking;
21

22
#[cfg(unix)]
23
use crate::process::unix::WaitStatus;
24

25
type ExpectResult<T> = Result<T, Error>;
26

27
/// InteractConfig represents options of an interactive session.
28
pub struct InteractSession<Session, Input, Output, State> {
29
    session: Session,
30
    input: Input,
31
    output: Output,
32
    escape_character: u8,
33
    #[cfg(unix)]
34
    status: Option<WaitStatus>,
35
    opts: InteractOptions<Session, Input, Output, State>,
36
}
37

38
/// Interact options (aka callbacks you can set to be callled being in an interactive mode).
39
struct InteractOptions<S, I, O, C> {
40
    state: C,
41
    input_filter: Option<OptFilter>,
42
    output_filter: Option<OptFilter>,
43
    input_action: Option<OptAction<S, I, O, C>>,
44
    output_action: Option<OptAction<S, I, O, C>>,
45
    idle_action: Option<OptAction<S, I, O, C>>,
46
}
47

48
type OptAction<S, I, O, C> = Box<dyn FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool>>;
49

50
type OptFilter = Box<dyn FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>>>;
51

52
impl<S, I, O, C> InteractSession<S, I, O, C> {
53
    /// Default escape character. <Ctrl-\]>
54
    pub const ESCAPE: u8 = 29;
55

56
    /// Creates a new object of [`InteractSession`].
57
    pub fn new(session: S, input: I, output: O, state: C) -> InteractSession<S, I, O, C> {
6✔
58
        InteractSession {
59
            input,
60
            output,
61
            session,
62
            escape_character: Self::ESCAPE,
63
            opts: InteractOptions {
6✔
64
                state,
65
                input_filter: None,
66
                output_filter: None,
67
                input_action: None,
68
                output_action: None,
69
                idle_action: None,
70
            },
71
            #[cfg(unix)]
72
            status: None,
73
        }
74
    }
75

76
    /// Sets an escape character after seen which the interact interactions will be stopped
77
    /// and controll will be returned to a caller process.
78
    pub fn set_escape_character(mut self, c: u8) -> Self {
×
79
        self.escape_character = c;
×
80
        self
×
81
    }
82

83
    /// Returns a status of spawned session if it was exited.
84
    ///
85
    /// If [`Self::spawn`] returns false but this method returns None it means that a child process was shutdown by various reasons.
86
    /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned.
87
    ///
88
    /// [`Self::spawn`]: crate::interact::InteractSession::spawn
89
    #[cfg(unix)]
NEW
90
    pub fn get_status(&self) -> Option<WaitStatus> {
×
91
        self.status
×
92
    }
93
}
94

95
impl<S, I, O, C> InteractSession<S, I, O, C> {
96
    /// Set a state
97
    pub fn with_state<State>(self, state: State) -> InteractSession<S, I, O, State> {
2✔
98
        let mut s = InteractSession::new(self.session, self.input, self.output, state);
2✔
99
        s.escape_character = self.escape_character;
2✔
100
        #[cfg(unix)]
101
        {
102
            s.status = self.status;
2✔
103
        }
104

105
        s
2✔
106
    }
107
    
108
    /// Get a reference on state
NEW
109
    pub fn get_state(&self) -> &C {
×
NEW
110
        &self.opts.state
×
111
    }
112

113
    /// Get a mut reference on state
NEW
114
    pub fn get_state_mut(&mut self) -> &mut C {
×
NEW
115
        &mut self.opts.state
×
116
    }
117

118
    /// Returns a inner state.
119
    pub fn into_state(self) -> C {
1✔
120
        self.opts.state
1✔
121
    }
122

123
    /// Sets the output filter.
124
    /// The output_filter will be passed all the output from the child process.
125
    ///
126
    /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks.
127
    pub fn set_output_filter<F>(&mut self, filter: F) -> &mut Self
1✔
128
    where
129
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
130
    {
131
        self.opts.output_filter = Some(Box::new(filter));
2✔
NEW
132
        self
×
133
    }
134

135
    /// Sets the input filter.
136
    /// The input_filter will be passed all the keyboard input from the user.
137
    ///
138
    /// The input_filter is run BEFORE the check for the escape_character.
139
    /// The filter is called BEFORE calling a on_input callback if it's set.
140
    pub fn set_input_filter<F>(&mut self, filter: F) -> &mut Self
1✔
141
    where
142
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
143
    {
144
        self.opts.input_filter = Some(Box::new(filter));
2✔
NEW
145
        self
×
146
    }
147

148
    /// Puts a hanlder which will be called when users input is detected.
149
    ///
150
    /// Be aware that currently async version doesn't take a Session as an argument.
151
    /// See <https://github.com/zhiburt/expectrl/issues/16>.
152
    pub fn set_input_action<F>(&mut self, action: F) -> &mut Self
3✔
153
    where
154
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
155
    {
156
        self.opts.input_action = Some(Box::new(action));
6✔
NEW
157
        self
×
158
    }
159

160
    /// Puts a hanlder which will be called when process output is detected.
161
    ///
162
    /// IMPORTANT:
163
    ///
164
    /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session
165
    /// will cause the read bytes not to apeard in the output stream!
166
    pub fn set_output_action<F>(&mut self, action: F) -> &mut Self
4✔
167
    where
168
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
169
    {
170
        self.opts.output_action = Some(Box::new(action));
8✔
NEW
171
        self
×
172
    }
173

174
    /// Puts a handler which will be called on each interaction when no input is detected.
175
    pub fn set_idle_action<F>(&mut self, action: F) -> &mut Self
1✔
176
    where
177
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
178
    {
179
        self.opts.idle_action = Some(Box::new(action));
2✔
NEW
180
        self
×
181
    }
182
}
183

184
#[cfg(not(any(feature = "async", feature = "polling")))]
185
impl<S, I, O, C> InteractSession<S, I, O, C>
186
where
187
    I: Read,
188
    O: Write,
189
    S: Expect + Termios + Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
190
{
191
    /// Runs the session.
192
    ///
193
    /// See [`Session::interact`].
194
    ///
195
    /// [`Session::interact`]: crate::session::Session::interact
196
    pub fn spawn(&mut self) -> ExpectResult<bool> {
5✔
197
        #[cfg(unix)]
198
        {
199
            let is_echo = self.session.is_echo()?;
5✔
200
            if !is_echo {
5✔
201
                let _ = self.session.set_echo(true);
5✔
202
            }
203

204
            self.status = None;
5✔
205
            let is_alive = interact_buzy_loop(self)?;
5✔
206

207
            if !is_echo {
5✔
208
                let _ = self.session.set_echo(false);
5✔
209
            }
210

211
            Ok(is_alive)
5✔
212
        }
213

214
        #[cfg(windows)]
215
        {
NEW
216
            interact_buzy_loop(self)
×
217
        }
218
    }
219
}
220

221
#[cfg(all(unix, feature = "polling", not(feature = "async")))]
222
impl<S, I, O> InteractSession<&mut Session<OsProcess, S>, I, O>
223
where
224
    I: Read + std::os::unix::io::AsRawFd,
225
    O: Write,
226
    S: Write + Read + std::os::unix::io::AsRawFd,
227
{
228
    /// Runs the session.
229
    ///
230
    /// See [`Session::interact`].
231
    ///
232
    /// [`Session::interact`]: crate::session::Session::interact
233
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
234
    where
235
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
236
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
237
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
238
        IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
239
        OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
240
        WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
241
    {
242
        let is_echo = self
×
243
            .session
×
244
            .get_process()
245
            .get_echo()
246
            .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
247
        if !is_echo {
×
248
            let _ = self.session.get_process_mut().set_echo(true, None);
×
249
        }
250

251
        self.status = None;
×
252
        let is_alive = interact_polling(self, ops.borrow_mut())?;
×
253

254
        if !is_echo {
×
255
            let _ = self.session.get_process_mut().set_echo(false, None);
×
256
        }
257

258
        Ok(is_alive)
×
259
    }
260
}
261

262
#[cfg(feature = "async")]
263
impl<S, I, O> InteractSession<&mut Session<OsProcess, S>, I, O>
264
where
265
    I: futures_lite::AsyncRead + Unpin,
266
    O: Write,
267
    S: futures_lite::AsyncRead + futures_lite::AsyncWrite + Unpin,
268
{
269
    /// Runs the session.
270
    ///
271
    /// See [`Session::interact`].
272
    ///
273
    /// [`Session::interact`]: crate::session::Session::interact
274
    pub async fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut opts: OPS) -> Result<bool, Error>
275
    where
276
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
277
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
278
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
279
        IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
280
        OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
281
        WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
282
    {
283
        #[cfg(unix)]
284
        {
285
            let is_echo = self
×
286
                .session
×
287
                .get_echo()
288
                .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
289
            if !is_echo {
×
290
                let _ = self.session.set_echo(true, None);
×
291
            }
292

293
            let is_alive = interact_async(self, opts.borrow_mut()).await?;
×
294

295
            if !is_echo {
×
296
                let _ = self.session.set_echo(false, None);
×
297
            }
298

299
            Ok(is_alive)
×
300
        }
301

302
        #[cfg(windows)]
303
        {
304
            interact_async(self, opts.borrow_mut()).await
×
305
        }
306
    }
307
}
308

309
#[cfg(all(windows, feature = "polling", not(feature = "async")))]
310
impl<I, O> InteractSession<&mut Session, I, O>
311
where
312
    I: Read + Clone + Send + 'static,
313
    O: Write,
314
{
315
    /// Runs the session.
316
    ///
317
    /// See [`Session::interact`].
318
    ///
319
    /// [`Session::interact`]: crate::session::Session::interact
320
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
321
    where
322
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
323
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
324
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
325
        IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
326
        OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
327
        WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
328
    {
329
        interact_polling_on_thread(self, ops.borrow_mut())
×
330
    }
331
}
332

333
impl<S, I, O, C> std::fmt::Debug for InteractSession<S, I, O, C>
334
where
335
    S: std::fmt::Debug,
336
    I: std::fmt::Debug,
337
    O: std::fmt::Debug,
338
    C: std::fmt::Debug,
339
{
340
    #[rustfmt::skip]
NEW
341
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
342
        f.debug_struct("InteractSession")
×
NEW
343
            .field("session", &self.session)
×
NEW
344
            .field("input", &self.input)
×
NEW
345
            .field("output", &self.output)
×
NEW
346
            .field("escape_character", &self.escape_character)
×
NEW
347
            .field("status", &self.status)
×
NEW
348
            .field("state", &std::ptr::addr_of!(self.opts.state))
×
NEW
349
            .field("opts:on_idle", &get_pointer(&self.opts.idle_action))
×
NEW
350
            .field("opts:on_input", &get_pointer(&self.opts.input_action))
×
NEW
351
            .field("opts:on_output", &get_pointer(&self.opts.output_action))
×
NEW
352
            .field("opts:input_filter", &get_pointer(&self.opts.input_filter))
×
NEW
353
            .field("opts:output_filter", &get_pointer(&self.opts.output_filter))
×
354
            .finish()
355
    }
356
}
357

358
#[cfg(all(unix, not(feature = "async"), not(feature = "polling")))]
359
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
5✔
360
where
361
    S: Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
362
    O: Write,
363
    I: Read,
364
{
365
    let mut buf = [0; 512];
5✔
366

367
    loop {
368
        let status = get_status(&s.session)?;
5✔
369
        if !matches!(status, Some(WaitStatus::StillAlive)) {
10✔
370
            s.status = status;
2✔
371
            return Ok(false);
2✔
372
        }
373

374
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
6✔
375
            let eof = n == 0;
3✔
376
            let buf = &buf[..n];
3✔
377
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
6✔
378

379
            #[rustfmt::skip]
380
            let exit = opt_action(
381
                Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof),
6✔
382
                &mut s.opts.output_action,
3✔
383
            )?;
384
            if eof || exit {
6✔
NEW
385
                return Ok(true);
×
386
            }
387

388
            spin_write(&mut s.output, &buf)?;
3✔
389
            spin_flush(&mut s.output)?;
6✔
390
        }
391

392
        // We dont't print user input back to the screen.
393
        // In terminal mode it will be ECHOed back automatically.
394
        // This way we preserve terminal seetings for example when user inputs password.
395
        // The terminal must have been prepared before.
396
        match s.input.read(&mut buf) {
3✔
397
            Ok(n) => {
3✔
398
                let eof = n == 0;
3✔
399
                let buf = &buf[..n];
6✔
400
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
3✔
401

402
                #[rustfmt::skip]
403
                let exit = opt_action(
404
                    Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof),
6✔
405
                    &mut s.opts.input_action,
3✔
406
                )?;
407
                if eof | exit {
3✔
408
                    return Ok(true);
3✔
409
                }
410

411
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
12✔
412
                match escape_char_position {
3✔
413
                    Some(pos) => {
1✔
414
                        s.session.write_all(&buf[..pos])?;
2✔
415
                        return Ok(true);
1✔
416
                    }
417
                    None => {
418
                        s.session.write_all(&buf[..])?;
11✔
419
                    }
420
                }
421
            }
422
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
9✔
423
            Err(err) => return Err(err.into()),
×
424
        }
425

426
        #[rustfmt::skip]
427
        let exit = opt_action(
428
            Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, false),
3✔
429
            &mut s.opts.idle_action,
3✔
430
        )?;
431
        if exit {
3✔
UNCOV
432
            return Ok(true);
×
433
        }
434
    }
435
}
436

437
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
438
fn interact_polling<S, O, I, C, IF, OF, IA, OA, WA>(
439
    interact: &mut InteractSession<&mut Session<OsProcess, S>, I, O>,
440
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
441
) -> Result<bool, Error>
442
where
443
    S: Write + Read + std::os::unix::io::AsRawFd,
444
    I: Read + std::os::unix::io::AsRawFd,
445
    O: Write,
446
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
447
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
448
    IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
449
    OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
450
    WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
451
{
452
    use polling::{Event, Poller};
453

454
    // Create a poller and register interest in readability on the socket.
455
    let poller = Poller::new()?;
456
    poller.add(interact.input.as_raw_fd(), Event::readable(0))?;
457
    poller.add(
458
        interact.session.get_stream().as_raw_fd(),
459
        Event::readable(1),
460
    )?;
461

462
    let mut buf = [0; 512];
463

464
    // The event loop.
465
    let mut events = Vec::new();
466
    loop {
467
        let status = get_status(interact.session)?;
468
        if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
469
            interact.status = status;
470
            return Ok(false);
471
        }
472

473
        // Wait for at least one I/O event.
474
        events.clear();
475
        let _ = poller.wait(&mut events, Some(std::time::Duration::from_secs(5)))?;
476

477
        for ev in &events {
478
            if ev.key == 0 {
479
                // We dont't print user input back to the screen.
480
                // In terminal mode it will be ECHOed back automatically.
481
                // This way we preserve terminal seetings for example when user inputs password.
482
                // The terminal must have been prepared before.
483
                match interact.input.read(&mut buf) {
484
                    Ok(n) => {
485
                        let eof = n == 0;
486
                        let buf = &buf[..n];
487
                        let buf = call_filter(opts.input_filter.as_mut(), buf)?;
488

489
                        let exit = call_action(
490
                            opts.input_action.as_mut(),
491
                            interact.session,
492
                            &mut interact.input,
493
                            &mut interact.output,
494
                            &mut opts.state,
495
                            &buf,
496
                            eof,
497
                        )?;
498

499
                        if eof || exit {
500
                            return Ok(true);
501
                        }
502

503
                        let escape_char_pos =
504
                            buf.iter().position(|c| *c == interact.escape_character);
505
                        match escape_char_pos {
506
                            Some(pos) => {
507
                                interact.session.write_all(&buf[..pos]).map_err(Error::IO)?;
508
                                return Ok(true);
509
                            }
510
                            None => interact.session.write_all(&buf[..])?,
511
                        }
512
                    }
513
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
514
                    Err(err) => return Err(err.into()),
515
                }
516

517
                // Set interest in the next readability event.
518
                poller.modify(interact.input.as_raw_fd(), Event::readable(0))?;
519
            }
520

521
            if ev.key == 1 {
522
                match interact.session.read(&mut buf) {
523
                    Ok(n) => {
524
                        let eof = n == 0;
525
                        let buf = &buf[..n];
526
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
527

528
                        let exit = call_action(
529
                            opts.output_action.as_mut(),
530
                            interact.session,
531
                            &mut interact.input,
532
                            &mut interact.output,
533
                            &mut opts.state,
534
                            &buf,
535
                            eof,
536
                        )?;
537

538
                        if eof || exit {
539
                            return Ok(true);
540
                        }
541

542
                        spin_write(&mut interact.output, &buf)?;
543
                        spin_flush(&mut interact.output)?;
544
                    }
545
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
546
                    Err(err) => return Err(err.into()),
547
                }
548

549
                // Set interest in the next readability event.
550
                poller.modify(
551
                    interact.session.get_stream().as_raw_fd(),
552
                    Event::readable(1),
553
                )?;
554
            }
555
        }
556

557
        let exit = call_action(
558
            opts.idle_action.as_mut(),
559
            interact.session,
560
            &mut interact.input,
561
            &mut interact.output,
562
            &mut opts.state,
563
            &[],
564
            false,
565
        )?;
566

567
        if exit {
568
            return Ok(true);
569
        }
570
    }
571
}
572

573
#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
574
fn interact_polling_on_thread<O, I, C, IF, OF, IA, OA, WA>(
575
    interact: &mut InteractSession<&mut Session, I, O>,
576
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
577
) -> Result<bool, Error>
578
where
579
    I: Read + Clone + Send + 'static,
580
    O: Write,
581
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
582
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
583
    IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
584
    OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
585
    WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
586
{
587
    use crate::{
588
        error::to_io_error,
589
        waiter::{Recv, Wait2},
590
    };
591

592
    // Create a poller and register interest in readability on the socket.
593
    let stream = interact
594
        .session
595
        .get_stream()
596
        .try_clone()
597
        .map_err(to_io_error(""))?;
598
    let mut poller = Wait2::new(interact.input.clone(), stream);
599

600
    loop {
601
        // In case where proceses exits we are trying to
602
        // fill buffer to run callbacks if there was something in.
603
        //
604
        // We ignore errors because there might be errors like EOCHILD etc.
605
        if interact.session.is_alive()? {
606
            return Ok(false);
607
        }
608

609
        // Wait for at least one I/O event.
610
        let event = poller.recv().map_err(to_io_error(""))?;
611
        match event {
612
            Recv::R1(b) => match b {
613
                Ok(b) => {
614
                    let buf = b.map_or([0], |b| [b]);
615
                    let eof = b.is_none();
616
                    let n = if eof { 0 } else { 1 };
617
                    let buf = &buf[..n];
618

619
                    let buf = call_filter(opts.input_filter.as_mut(), buf)?;
620

621
                    let exit = call_action(
622
                        opts.input_action.as_mut(),
623
                        interact.session,
624
                        &mut interact.input,
625
                        &mut interact.output,
626
                        &mut opts.state,
627
                        &buf,
628
                        eof,
629
                    )?;
630

631
                    if eof || exit {
632
                        return Ok(true);
633
                    }
634

635
                    // todo: replace all of these by 1 by 1 write
636
                    let escape_char_pos = buf.iter().position(|c| *c == interact.escape_character);
637
                    match escape_char_pos {
638
                        Some(pos) => {
639
                            interact.session.write_all(&buf[..pos])?;
640
                            return Ok(true);
641
                        }
642
                        None => interact.session.write_all(&buf[..])?,
643
                    }
644
                }
645
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
646
                Err(err) => return Err(err.into()),
647
            },
648
            Recv::R2(b) => match b {
649
                Ok(b) => {
650
                    let buf = b.map_or([0], |b| [b]);
651
                    let eof = b.is_none();
652
                    let n = if eof { 0 } else { 1 };
653
                    let buf = &buf[..n];
654

655
                    let buf = call_filter(opts.output_filter.as_mut(), buf)?;
656

657
                    let exit = call_action(
658
                        opts.output_action.as_mut(),
659
                        interact.session,
660
                        &mut interact.input,
661
                        &mut interact.output,
662
                        &mut opts.state,
663
                        &buf,
664
                        eof,
665
                    )?;
666

667
                    if eof || exit {
668
                        return Ok(true);
669
                    }
670

671
                    interact.output.write_all(&buf)?;
672
                    interact.output.flush()?;
673
                }
674
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
675
                Err(err) => return Err(err.into()),
676
            },
677
            Recv::Timeout => {
678
                let exit = call_action(
679
                    opts.idle_action.as_mut(),
680
                    interact.session,
681
                    &mut interact.input,
682
                    &mut interact.output,
683
                    &mut opts.state,
684
                    &[],
685
                    false,
686
                )?;
687

688
                if exit {
689
                    return Ok(true);
690
                }
691
            }
692
        }
693
    }
694
}
695

696
#[cfg(feature = "async")]
697
async fn interact_async<S, O, I, C, IF, OF, IA, OA, WA>(
698
    interact: &mut InteractSession<&mut Session<OsProcess, S>, I, O>,
699
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
700
) -> Result<bool, Error>
701
where
702
    S: futures_lite::AsyncRead + futures_lite::AsyncWrite + Unpin,
703
    I: futures_lite::AsyncRead + Unpin,
704
    O: Write,
705
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
706
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
707
    IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
708
    OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
709
    WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
710
{
711
    use std::io;
712

713
    use futures_lite::{AsyncReadExt, AsyncWriteExt};
714

715
    let mut stdin_buf = [0; 512];
716
    let mut proc_buf = [0; 512];
717
    loop {
718
        #[cfg(unix)]
719
        {
720
            let status = get_status(interact.session)?;
721
            if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
722
                interact.status = status;
723
                return Ok(false);
724
            }
725
        }
726

727
        #[cfg(windows)]
728
        {
729
            if !interact.session.is_alive()? {
730
                return Ok(false);
731
            }
732
        }
733

734
        #[derive(Debug)]
735
        enum ReadFrom {
736
            Stdin,
737
            OsProcessess,
738
            Timeout,
739
        }
740

741
        let read_process = async {
742
            (
743
                ReadFrom::OsProcessess,
744
                interact.session.read(&mut proc_buf).await,
745
            )
746
        };
747
        let read_stdin = async { (ReadFrom::Stdin, interact.input.read(&mut stdin_buf).await) };
748
        let timeout = async {
749
            (
750
                ReadFrom::Timeout,
751
                async {
752
                    futures_timer::Delay::new(std::time::Duration::from_secs(5)).await;
753
                    io::Result::Ok(0)
754
                }
755
                .await,
756
            )
757
        };
758

759
        let read_fut = futures_lite::future::or(read_process, read_stdin);
760
        let (read_from, result) = futures_lite::future::or(read_fut, timeout).await;
761

762
        match read_from {
763
            ReadFrom::OsProcessess => {
764
                let n = result?;
765
                let eof = n == 0;
766
                let buf = &proc_buf[..n];
767
                let buf = call_filter(opts.output_filter.as_mut(), buf)?;
768

769
                let exit = call_action(
770
                    opts.output_action.as_mut(),
771
                    interact.session,
772
                    &mut interact.input,
773
                    &mut interact.output,
774
                    &mut opts.state,
775
                    &buf,
776
                    eof,
777
                )?;
778

779
                if eof || exit {
780
                    return Ok(true);
781
                }
782

783
                spin_write(&mut interact.output, &buf)?;
784
                spin_flush(&mut interact.output)?;
785
            }
786
            ReadFrom::Stdin => {
787
                // We dont't print user input back to the screen.
788
                // In terminal mode it will be ECHOed back automatically.
789
                // This way we preserve terminal seetings for example when user inputs password.
790
                // The terminal must have been prepared before.
791
                match result {
792
                    Ok(n) => {
793
                        let eof = n == 0;
794
                        let buf = &stdin_buf[..n];
795
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
796

797
                        let exit = call_action(
798
                            opts.input_action.as_mut(),
799
                            interact.session,
800
                            &mut interact.input,
801
                            &mut interact.output,
802
                            &mut opts.state,
803
                            &buf,
804
                            eof,
805
                        )?;
806

807
                        if eof || exit {
808
                            return Ok(true);
809
                        }
810

811
                        let escape_char_pos =
812
                            buf.iter().position(|c| *c == interact.escape_character);
813
                        match escape_char_pos {
814
                            Some(pos) => {
815
                                interact.session.write_all(&buf[..pos]).await?;
816
                                return Ok(true);
817
                            }
818
                            None => interact.session.write_all(&buf[..]).await?,
819
                        }
820
                    }
821
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
822
                    Err(err) => return Err(err.into()),
823
                }
824
            }
825
            ReadFrom::Timeout => {
826
                let exit = call_action(
827
                    opts.idle_action.as_mut(),
828
                    interact.session,
829
                    &mut interact.input,
830
                    &mut interact.output,
831
                    &mut opts.state,
832
                    &[],
833
                    false,
834
                )?;
835

836
                if exit {
837
                    return Ok(true);
838
                }
839
            }
840
        }
841
    }
842
}
843

844
fn spin_write<W>(mut writer: W, buf: &[u8]) -> std::io::Result<()>
1✔
845
where
846
    W: Write,
847
{
848
    loop {
849
        match writer.write_all(buf) {
2✔
850
            Ok(_) => return Ok(()),
1✔
851
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
852
            Err(_) => (),
853
        }
854
    }
855
}
856

857
fn spin_flush<W>(mut writer: W) -> std::io::Result<()>
1✔
858
where
859
    W: Write,
860
{
861
    loop {
862
        match writer.flush() {
2✔
863
            Ok(_) => return Ok(()),
1✔
864
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
865
            Err(_) => (),
866
        }
867
    }
868
}
869

870
fn opt_action<S, I, O, C>(
3✔
871
    ctx: Context<'_, S, I, O, C>,
872
    opt: &mut Option<OptAction<S, I, O, C>>,
873
) -> ExpectResult<bool> {
874
    match opt {
3✔
875
        Some(action) => (action)(ctx),
2✔
876
        None => Ok(false),
877
    }
878
}
879

880
fn call_filter<F>(filter: Option<F>, buf: &[u8]) -> Result<Cow<'_, [u8]>, Error>
1✔
881
where
882
    F: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
883
{
884
    match filter {
1✔
885
        Some(mut action) => (action)(buf),
2✔
886
        None => Ok(Cow::Borrowed(buf)),
1✔
887
    }
888
}
889

890
#[cfg(unix)]
891
fn get_status<S>(session: &S) -> Result<Option<S::Status>, Error>
2✔
892
where
893
    S: Healthcheck,
894
{
895
    match session.get_status() {
2✔
896
        Ok(status) => Ok(Some(status)),
2✔
897
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
3✔
NEW
898
        Err(err) => Err(Error::IO(err)),
×
899
    }
900
}
901

902
#[cfg(unix)]
903
fn try_read<S>(session: &mut S, buf: &mut [u8]) -> ExpectResult<Option<usize>>
1✔
904
where
905
    S: NonBlocking + Read,
906
{
907
    session.set_blocking(false)?;
1✔
908

909
    let result = session.read(buf);
1✔
910

911
    session.set_blocking(true)?;
3✔
912

913
    match result {
1✔
914
        Ok(n) => Ok(Some(n)),
1✔
915
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
3✔
NEW
916
        Err(err) => Err(Error::IO(err)),
×
917
    }
918
}
919

920
fn get_pointer<T>(ptr: &Option<Box<T>>) -> usize
921
where
922
    T: ?Sized,
923
{
924
    ptr.as_ref().map_or(0, |f| std::ptr::addr_of!(f) as usize)
925
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc