• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 8129528574

03 Mar 2024 10:26AM UTC coverage: 47.348% (-9.2%) from 56.575%
8129528574

Pull #68

github

web-flow
Merge 43d30f6c4 into e8b43ff1b
Pull Request #68: [WIP] Move to trait based version `Expect`

159 of 380 new or added lines in 15 files covered. (41.84%)

724 existing lines in 16 files now uncovered.

1491 of 3149 relevant lines covered (47.35%)

3.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.11
/src/interact/session.rs
1
//! This module contains a [`InteractSession`] which runs an interact session with IO.
2

3
// todo: PtyProcess wait_echo optimize by not looping when timout is none
4

5
use std::{
6
    borrow::Cow,
7
    io::{ErrorKind, Write},
8
};
9

10
#[cfg(not(feature = "async"))]
11
use std::io::Read;
12

13
#[cfg(feature = "async")]
14
use std::{io, time::Duration};
15

16
#[cfg(feature = "async")]
17
use futures_timer::Delay;
18

19
#[cfg(feature = "async")]
20
use futures_lite::{
21
    future,
22
    io::{AsyncRead, AsyncWrite},
23
    AsyncReadExt, AsyncWriteExt,
24
};
25

26
use crate::{
27
    process::Healthcheck,
28
    Error,
29
};
30

31
#[cfg(unix)]
32
use crate::process::Termios;
33

34
#[cfg(not(feature = "async"))]
35
use crate::Expect;
36

37
#[cfg(feature = "async")]
38
use crate::AsyncExpect;
39

40
use crate::interact::Context;
41
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
42
use crate::process::NonBlocking;
43

44
#[cfg(unix)]
45
use crate::process::unix::WaitStatus;
46

47
type ExpectResult<T> = Result<T, Error>;
48

49
/// InteractConfig represents options of an interactive session.
50
pub struct InteractSession<Session, Input, Output, State> {
51
    session: Session,
52
    input: Input,
53
    output: Output,
54
    escape_character: u8,
55
    #[cfg(unix)]
56
    status: Option<WaitStatus>,
57
    opts: InteractOptions<Session, Input, Output, State>,
58
}
59

60
/// Interact options (aka callbacks you can set to be callled being in an interactive mode).
61
struct InteractOptions<S, I, O, C> {
62
    state: C,
63
    input_filter: Option<OptFilter>,
64
    output_filter: Option<OptFilter>,
65
    input_action: Option<OptAction<S, I, O, C>>,
66
    output_action: Option<OptAction<S, I, O, C>>,
67
    idle_action: Option<OptAction<S, I, O, C>>,
68
}
69

70
type OptAction<S, I, O, C> = Box<dyn FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool>>;
71

72
type OptFilter = Box<dyn FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>>>;
73

74
impl<S, I, O, C> InteractSession<S, I, O, C> {
75
    /// Default escape character. <Ctrl-\]>
76
    pub const ESCAPE: u8 = 29;
77

78
    /// Creates a new object of [`InteractSession`].
79
    pub fn new(session: S, input: I, output: O, state: C) -> InteractSession<S, I, O, C> {
3✔
80
        InteractSession {
81
            input,
82
            output,
83
            session,
84
            escape_character: Self::ESCAPE,
85
            opts: InteractOptions {
3✔
86
                state,
87
                input_filter: None,
88
                output_filter: None,
89
                input_action: None,
90
                output_action: None,
91
                idle_action: None,
92
            },
93
            #[cfg(unix)]
94
            status: None,
95
        }
96
    }
97

98
    /// Sets an escape character after seen which the interact interactions will be stopped
99
    /// and controll will be returned to a caller process.
100
    pub fn set_escape_character(mut self, c: u8) -> Self {
×
101
        self.escape_character = c;
×
102
        self
×
103
    }
104

105
    /// Returns a status of spawned session if it was exited.
106
    ///
107
    /// If [`Self::spawn`] returns false but this method returns None it means that a child process was shutdown by various reasons.
108
    /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned.
109
    ///
110
    /// [`Self::spawn`]: crate::interact::InteractSession::spawn
111
    #[cfg(unix)]
NEW
112
    pub fn get_status(&self) -> Option<WaitStatus> {
×
113
        self.status
×
114
    }
115
}
116

117
impl<S, I, O, C> InteractSession<S, I, O, C> {
118
    /// Set a state
NEW
119
    pub fn with_state<State>(self, state: State) -> InteractSession<S, I, O, State> {
×
NEW
120
        let mut s = InteractSession::new(self.session, self.input, self.output, state);
×
NEW
121
        s.escape_character = self.escape_character;
×
122
        #[cfg(unix)]
123
        {
NEW
124
            s.status = self.status;
×
125
        }
126

NEW
127
        s
×
128
    }
129

130
    /// Get a reference on state
NEW
131
    pub fn get_state(&self) -> &C {
×
NEW
132
        &self.opts.state
×
133
    }
134

135
    /// Get a mut reference on state
NEW
136
    pub fn get_state_mut(&mut self) -> &mut C {
×
NEW
137
        &mut self.opts.state
×
138
    }
139

140
    /// Returns a inner state.
141
    pub fn into_state(self) -> C {
1✔
142
        self.opts.state
1✔
143
    }
144

145
    /// Sets the output filter.
146
    /// The output_filter will be passed all the output from the child process.
147
    ///
148
    /// The filter isn't applied to user's `read` calls through the [`Context`] in callbacks.
149
    pub fn set_output_filter<F>(&mut self, filter: F) -> &mut Self
150
    where
151
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
152
    {
NEW
153
        self.opts.output_filter = Some(Box::new(filter));
×
NEW
154
        self
×
155
    }
156

157
    /// Sets the input filter.
158
    /// The input_filter will be passed all the keyboard input from the user.
159
    ///
160
    /// The input_filter is run BEFORE the check for the escape_character.
161
    /// The filter is called BEFORE calling a on_input callback if it's set.
162
    pub fn set_input_filter<F>(&mut self, filter: F) -> &mut Self
163
    where
164
        F: FnMut(&[u8]) -> ExpectResult<Cow<'_, [u8]>> + 'static,
165
    {
NEW
166
        self.opts.input_filter = Some(Box::new(filter));
×
NEW
167
        self
×
168
    }
169

170
    /// Puts a hanlder which will be called when users input is detected.
171
    ///
172
    /// Be aware that currently async version doesn't take a Session as an argument.
173
    /// See <https://github.com/zhiburt/expectrl/issues/16>.
174
    pub fn set_input_action<F>(&mut self, action: F) -> &mut Self
175
    where
176
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
177
    {
NEW
178
        self.opts.input_action = Some(Box::new(action));
×
NEW
179
        self
×
180
    }
181

182
    /// Puts a hanlder which will be called when process output is detected.
183
    ///
184
    /// IMPORTANT:
185
    ///
186
    /// Please be aware that your use of [Session::expect], [Session::check] and any `read` operation on session
187
    /// will cause the read bytes not to apeard in the output stream!
188
    pub fn set_output_action<F>(&mut self, action: F) -> &mut Self
1✔
189
    where
190
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
191
    {
192
        self.opts.output_action = Some(Box::new(action));
2✔
NEW
193
        self
×
194
    }
195

196
    /// Puts a handler which will be called on each interaction when no input is detected.
197
    pub fn set_idle_action<F>(&mut self, action: F) -> &mut Self
198
    where
199
        F: FnMut(Context<'_, S, I, O, C>) -> ExpectResult<bool> + 'static,
200
    {
NEW
201
        self.opts.idle_action = Some(Box::new(action));
×
NEW
202
        self
×
203
    }
204
}
205

206
#[cfg(all(unix, not(any(feature = "async", feature = "polling"))))]
207
impl<S, I, O, C> InteractSession<S, I, O, C>
208
where
209
    I: Read,
210
    O: Write,
211
    S: Expect + Termios + Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
212
{
213
    /// Runs the session.
214
    ///
215
    /// See [`Session::interact`].
216
    ///
217
    /// [`Session::interact`]: crate::session::Session::interact
NEW
218
    pub fn spawn(&mut self) -> ExpectResult<bool> {
×
219
        #[cfg(unix)]
220
        {
NEW
221
            let is_echo = self.session.is_echo()?;
×
222
            if !is_echo {
×
NEW
223
                let _ = self.session.set_echo(true);
×
224
            }
225

UNCOV
226
            self.status = None;
×
NEW
227
            let is_alive = interact_buzy_loop(self)?;
×
228

UNCOV
229
            if !is_echo {
×
NEW
230
                let _ = self.session.set_echo(false);
×
231
            }
232

UNCOV
233
            Ok(is_alive)
×
234
        }
235

236
        #[cfg(windows)]
237
        {
NEW
238
            interact_buzy_loop(self)
×
239
        }
240
    }
241
}
242

243
#[cfg(all(windows, not(any(feature = "async", feature = "polling"))))]
244
impl<S, I, O, C> InteractSession<S, I, O, C>
245
where
246
    I: Read,
247
    O: Write,
248
    S: Expect + Healthcheck + NonBlocking + Write + Read,
249
{
250
    /// Runs the session.
251
    ///
252
    /// See [`Session::interact`].
253
    ///
254
    /// [`Session::interact`]: crate::session::Session::interact
NEW
255
    pub fn spawn(&mut self) -> ExpectResult<bool> {
×
256
        #[cfg(unix)]
257
        {
NEW
258
            let is_echo = self.session.is_echo()?;
×
NEW
259
            if !is_echo {
×
NEW
260
                let _ = self.session.set_echo(true);
×
261
            }
262

NEW
263
            self.status = None;
×
NEW
264
            let is_alive = interact_buzy_loop(self)?;
×
265

NEW
266
            if !is_echo {
×
NEW
267
                let _ = self.session.set_echo(false);
×
268
            }
269

NEW
270
            Ok(is_alive)
×
271
        }
272

273
        #[cfg(windows)]
274
        {
NEW
275
            interact_buzy_loop(self)
×
276
        }
277
    }
278
}
279

280
#[cfg(all(unix, feature = "polling", not(feature = "async")))]
281
impl<S, I, O> InteractSession<&mut Session<OsProcess, S>, I, O>
282
where
283
    I: Read + std::os::unix::io::AsRawFd,
284
    O: Write,
285
    S: Write + Read + std::os::unix::io::AsRawFd,
286
{
287
    /// Runs the session.
288
    ///
289
    /// See [`Session::interact`].
290
    ///
291
    /// [`Session::interact`]: crate::session::Session::interact
292
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
293
    where
294
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
295
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
296
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
297
        IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
298
        OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
299
        WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
300
    {
301
        let is_echo = self
×
302
            .session
×
303
            .get_process()
304
            .get_echo()
305
            .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
306
        if !is_echo {
×
307
            let _ = self.session.get_process_mut().set_echo(true, None);
×
308
        }
309

310
        self.status = None;
×
311
        let is_alive = interact_polling(self, ops.borrow_mut())?;
×
312

313
        if !is_echo {
×
314
            let _ = self.session.get_process_mut().set_echo(false, None);
×
315
        }
316

317
        Ok(is_alive)
×
318
    }
319
}
320

321
#[cfg(all(unix, feature = "async"))]
322
impl<S, I, O, C> InteractSession<S, I, O, C>
323
where
324
    I: AsyncRead + Unpin,
325
    O: AsyncWrite + Unpin,
326
    S: AsyncExpect + Termios + Healthcheck<Status = WaitStatus> + AsyncWrite + AsyncRead + Unpin,
327
{
328
    /// Runs the session.
329
    ///
330
    /// See [`Session::interact`].
331
    ///
332
    /// [`Session::interact`]: crate::session::Session::interact
333
    pub async fn spawn(&mut self) -> Result<bool, Error> {
18✔
334
        let is_echo = self.session.is_echo().map_err(Error::IO)?;
6✔
335
        if !is_echo {
3✔
336
            let _ = self.session.set_echo(true);
6✔
337
        }
338

339
        let is_alive = interact_async(self).await?;
6✔
340

341
        if !is_echo {
3✔
342
            let _ = self.session.set_echo(false);
6✔
343
        }
344

345
        Ok(is_alive)
3✔
346
    }
347
}
348

349
#[cfg(all(windows, feature = "async"))]
350
impl<S, I, O, C> InteractSession<S, I, O, C>
351
where
352
    I: AsyncRead + Unpin,
353
    O: AsyncWrite + Unpin,
354
    S: AsyncExpect + Healthcheck + AsyncWrite + AsyncRead + Unpin,
355
{
356
    /// Runs the session.
357
    ///
358
    /// See [`Session::interact`].
359
    ///
360
    /// [`Session::interact`]: crate::session::Session::interact
NEW
361
    pub async fn spawn(&mut self) -> Result<bool, Error> {
×
NEW
362
        interact_async(self).await
×
363
    }
364
}
365

366
#[cfg(all(windows, feature = "polling", not(feature = "async")))]
367
impl<I, O> InteractSession<&mut Session, I, O>
368
where
369
    I: Read + Clone + Send + 'static,
370
    O: Write,
371
{
372
    /// Runs the session.
373
    ///
374
    /// See [`Session::interact`].
375
    ///
376
    /// [`Session::interact`]: crate::session::Session::interact
377
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
378
    where
379
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
380
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
381
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
382
        IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
383
        OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
384
        WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
385
    {
386
        interact_polling_on_thread(self, ops.borrow_mut())
×
387
    }
388
}
389

390
impl<S, I, O, C> std::fmt::Debug for InteractSession<S, I, O, C>
391
where
392
    S: std::fmt::Debug,
393
    I: std::fmt::Debug,
394
    O: std::fmt::Debug,
395
    C: std::fmt::Debug,
396
{
397
    #[rustfmt::skip]
NEW
398
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
399
        let mut s = f.debug_struct("InteractSession");
×
NEW
400
        let _ = s.field("session", &self.session)
×
NEW
401
            .field("input", &self.input)
×
NEW
402
            .field("output", &self.output)
×
NEW
403
            .field("escape_character", &self.escape_character);
×
404

405
        #[cfg(unix)]
406
        {
NEW
407
            s.field("status", &self.status);
×
408
        }
409

NEW
410
        let _ = s
×
NEW
411
            .field("state", &std::ptr::addr_of!(self.opts.state))
×
NEW
412
            .field("opts:on_idle", &get_pointer(&self.opts.idle_action))
×
NEW
413
            .field("opts:on_input", &get_pointer(&self.opts.input_action))
×
NEW
414
            .field("opts:on_output", &get_pointer(&self.opts.output_action))
×
NEW
415
            .field("opts:input_filter", &get_pointer(&self.opts.input_filter))
×
NEW
416
            .field("opts:output_filter", &get_pointer(&self.opts.output_filter));
×
417
        
NEW
418
        s.finish()
×
419
    }
420
}
421

422
#[cfg(all(unix, not(feature = "async"), not(feature = "polling")))]
423
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
424
where
425
    S: Healthcheck<Status = WaitStatus> + NonBlocking + Write + Read,
426
    O: Write,
427
    I: Read,
428
{
429
    let mut buf = [0; 512];
430

431
    loop {
432
        let status = get_status(&s.session)?;
433
        if !matches!(status, Some(WaitStatus::StillAlive)) {
434
            s.status = status;
435
            return Ok(false);
436
        }
437

438
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
439
            let eof = n == 0;
440
            let buf = &buf[..n];
441
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
442

443
            let exit = run_action_output(s, &buf, eof)?;
444
            if eof || exit {
445
                return Ok(true);
446
            }
447

448
            spin_write(&mut s.output, &buf)?;
449
            spin_flush(&mut s.output)?;
450
        }
451

452
        // We dont't print user input back to the screen.
453
        // In terminal mode it will be ECHOed back automatically.
454
        // This way we preserve terminal seetings for example when user inputs password.
455
        // The terminal must have been prepared before.
456
        match s.input.read(&mut buf) {
457
            Ok(n) => {
458
                let eof = n == 0;
459
                let buf = &buf[..n];
460
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
461

462
                #[rustfmt::skip]
463
                let exit = run_action_input(s, &buf, eof)?;
464
                if eof | exit {
465
                    return Ok(true);
466
                }
467

468
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
469
                match escape_char_position {
470
                    Some(pos) => {
471
                        s.session.write_all(&buf[..pos])?;
472
                        return Ok(true);
473
                    }
474
                    None => {
475
                        s.session.write_all(&buf[..])?;
476
                    }
477
                }
478
            }
479
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
480
            Err(err) => return Err(err.into()),
481
        }
482

483
        let exit = run_action_idle(s, &[], false)?;
484
        if exit {
485
            return Ok(true);
486
        }
487
    }
488
}
489

490
#[cfg(all(windows, not(feature = "async"), not(feature = "polling")))]
491
fn interact_buzy_loop<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> ExpectResult<bool>
492
where
493
    S: Healthcheck + NonBlocking + Write + Read,
494
    O: Write,
495
    I: Read,
496
{
497
    let mut buf = [0; 512];
498

499
    loop {
500
        if !s.session.is_alive()? {
501
            return Ok(false);
502
        }
503

504
        if let Some(n) = try_read(&mut s.session, &mut buf)? {
505
            let eof = n == 0;
506
            let buf = &buf[..n];
507
            let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
508

509
            let exit = run_action_output(s, &buf, eof)?;
510
            if eof || exit {
511
                return Ok(true);
512
            }
513

514
            spin_write(&mut s.output, &buf)?;
515
            spin_flush(&mut s.output)?;
516
        }
517

518
        // We dont't print user input back to the screen.
519
        // In terminal mode it will be ECHOed back automatically.
520
        // This way we preserve terminal seetings for example when user inputs password.
521
        // The terminal must have been prepared before.
522
        match s.input.read(&mut buf) {
523
            Ok(n) => {
524
                let eof = n == 0;
525
                let buf = &buf[..n];
526
                let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;
527

528
                let exit = run_action_input(s, &buf, eof)?;
529
                if eof | exit {
530
                    return Ok(true);
531
                }
532

533
                let escape_char_position = buf.iter().position(|c| *c == s.escape_character);
534
                match escape_char_position {
535
                    Some(pos) => {
536
                        s.session.write_all(&buf[..pos])?;
537
                        return Ok(true);
538
                    }
539
                    None => {
540
                        s.session.write_all(&buf[..])?;
541
                    }
542
                }
543
            }
544
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
545
            Err(err) => return Err(err.into()),
546
        }
547

548
        let exit = run_action_idle(s, &[], false)?;
549
        if exit {
550
            return Ok(true);
551
        }
552
    }
553
}
554

555
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
556
fn interact_polling<S, O, I, C, IF, OF, IA, OA, WA>(
557
    interact: &mut InteractSession<&mut Session<OsProcess, S>, I, O>,
558
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
559
) -> Result<bool, Error>
560
where
561
    S: Write + Read + std::os::unix::io::AsRawFd,
562
    I: Read + std::os::unix::io::AsRawFd,
563
    O: Write,
564
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
565
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
566
    IA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
567
    OA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
568
    WA: FnMut(Context<'_, Session<OsProcess, S>, I, O, C>) -> Result<bool, Error>,
569
{
570
    use polling::{Event, Poller};
571

572
    // Create a poller and register interest in readability on the socket.
573
    let poller = Poller::new()?;
574
    poller.add(interact.input.as_raw_fd(), Event::readable(0))?;
575
    poller.add(
576
        interact.session.get_stream().as_raw_fd(),
577
        Event::readable(1),
578
    )?;
579

580
    let mut buf = [0; 512];
581

582
    // The event loop.
583
    let mut events = Vec::new();
584
    loop {
585
        let status = get_status(interact.session)?;
586
        if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
587
            interact.status = status;
588
            return Ok(false);
589
        }
590

591
        // Wait for at least one I/O event.
592
        events.clear();
593
        let _ = poller.wait(&mut events, Some(std::time::Duration::from_secs(5)))?;
594

595
        for ev in &events {
596
            if ev.key == 0 {
597
                // We dont't print user input back to the screen.
598
                // In terminal mode it will be ECHOed back automatically.
599
                // This way we preserve terminal seetings for example when user inputs password.
600
                // The terminal must have been prepared before.
601
                match interact.input.read(&mut buf) {
602
                    Ok(n) => {
603
                        let eof = n == 0;
604
                        let buf = &buf[..n];
605
                        let buf = call_filter(opts.input_filter.as_mut(), buf)?;
606

607
                        let exit = call_action(
608
                            opts.input_action.as_mut(),
609
                            interact.session,
610
                            &mut interact.input,
611
                            &mut interact.output,
612
                            &mut opts.state,
613
                            &buf,
614
                            eof,
615
                        )?;
616

617
                        if eof || exit {
618
                            return Ok(true);
619
                        }
620

621
                        let escape_char_pos =
622
                            buf.iter().position(|c| *c == interact.escape_character);
623
                        match escape_char_pos {
624
                            Some(pos) => {
625
                                interact.session.write_all(&buf[..pos]).map_err(Error::IO)?;
626
                                return Ok(true);
627
                            }
628
                            None => interact.session.write_all(&buf[..])?,
629
                        }
630
                    }
631
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
632
                    Err(err) => return Err(err.into()),
633
                }
634

635
                // Set interest in the next readability event.
636
                poller.modify(interact.input.as_raw_fd(), Event::readable(0))?;
637
            }
638

639
            if ev.key == 1 {
640
                match interact.session.read(&mut buf) {
641
                    Ok(n) => {
642
                        let eof = n == 0;
643
                        let buf = &buf[..n];
644
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
645

646
                        let exit = call_action(
647
                            opts.output_action.as_mut(),
648
                            interact.session,
649
                            &mut interact.input,
650
                            &mut interact.output,
651
                            &mut opts.state,
652
                            &buf,
653
                            eof,
654
                        )?;
655

656
                        if eof || exit {
657
                            return Ok(true);
658
                        }
659

660
                        spin_write(&mut interact.output, &buf)?;
661
                        spin_flush(&mut interact.output)?;
662
                    }
663
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
664
                    Err(err) => return Err(err.into()),
665
                }
666

667
                // Set interest in the next readability event.
668
                poller.modify(
669
                    interact.session.get_stream().as_raw_fd(),
670
                    Event::readable(1),
671
                )?;
672
            }
673
        }
674

675
        let exit = call_action(
676
            opts.idle_action.as_mut(),
677
            interact.session,
678
            &mut interact.input,
679
            &mut interact.output,
680
            &mut opts.state,
681
            &[],
682
            false,
683
        )?;
684

685
        if exit {
686
            return Ok(true);
687
        }
688
    }
689
}
690

691
#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
692
fn interact_polling_on_thread<O, I, C, IF, OF, IA, OA, WA>(
693
    interact: &mut InteractSession<&mut Session, I, O>,
694
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
695
) -> Result<bool, Error>
696
where
697
    I: Read + Clone + Send + 'static,
698
    O: Write,
699
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
700
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
701
    IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
702
    OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
703
    WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
704
{
705
    use crate::{
706
        error::to_io_error,
707
        waiter::{Recv, Wait2},
708
    };
709

710
    // Create a poller and register interest in readability on the socket.
711
    let stream = interact
712
        .session
713
        .get_stream()
714
        .try_clone()
715
        .map_err(to_io_error(""))?;
716
    let mut poller = Wait2::new(interact.input.clone(), stream);
717

718
    loop {
719
        // In case where proceses exits we are trying to
720
        // fill buffer to run callbacks if there was something in.
721
        //
722
        // We ignore errors because there might be errors like EOCHILD etc.
723
        if interact.session.is_alive()? {
724
            return Ok(false);
725
        }
726

727
        // Wait for at least one I/O event.
728
        let event = poller.recv().map_err(to_io_error(""))?;
729
        match event {
730
            Recv::R1(b) => match b {
731
                Ok(b) => {
732
                    let buf = b.map_or([0], |b| [b]);
733
                    let eof = b.is_none();
734
                    let n = if eof { 0 } else { 1 };
735
                    let buf = &buf[..n];
736

737
                    let buf = call_filter(opts.input_filter.as_mut(), buf)?;
738

739
                    let exit = call_action(
740
                        opts.input_action.as_mut(),
741
                        interact.session,
742
                        &mut interact.input,
743
                        &mut interact.output,
744
                        &mut opts.state,
745
                        &buf,
746
                        eof,
747
                    )?;
748

749
                    if eof || exit {
750
                        return Ok(true);
751
                    }
752

753
                    // todo: replace all of these by 1 by 1 write
754
                    let escape_char_pos = buf.iter().position(|c| *c == interact.escape_character);
755
                    match escape_char_pos {
756
                        Some(pos) => {
757
                            interact.session.write_all(&buf[..pos])?;
758
                            return Ok(true);
759
                        }
760
                        None => interact.session.write_all(&buf[..])?,
761
                    }
762
                }
763
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
764
                Err(err) => return Err(err.into()),
765
            },
766
            Recv::R2(b) => match b {
767
                Ok(b) => {
768
                    let buf = b.map_or([0], |b| [b]);
769
                    let eof = b.is_none();
770
                    let n = if eof { 0 } else { 1 };
771
                    let buf = &buf[..n];
772

773
                    let buf = call_filter(opts.output_filter.as_mut(), buf)?;
774

775
                    let exit = call_action(
776
                        opts.output_action.as_mut(),
777
                        interact.session,
778
                        &mut interact.input,
779
                        &mut interact.output,
780
                        &mut opts.state,
781
                        &buf,
782
                        eof,
783
                    )?;
784

785
                    if eof || exit {
786
                        return Ok(true);
787
                    }
788

789
                    interact.output.write_all(&buf)?;
790
                    interact.output.flush()?;
791
                }
792
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
793
                Err(err) => return Err(err.into()),
794
            },
795
            Recv::Timeout => {
796
                let exit = call_action(
797
                    opts.idle_action.as_mut(),
798
                    interact.session,
799
                    &mut interact.input,
800
                    &mut interact.output,
801
                    &mut opts.state,
802
                    &[],
803
                    false,
804
                )?;
805

806
                if exit {
807
                    return Ok(true);
808
                }
809
            }
810
        }
811
    }
812
}
813

814
#[cfg(all(unix, feature = "async"))]
815
async fn interact_async<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
6✔
816
where
817
    S: Healthcheck<Status = WaitStatus> + AsyncRead + AsyncWrite + Unpin,
818
    I: AsyncRead + Unpin,
819
    O: AsyncWrite + Unpin,
820
{
821
    #[derive(Debug)]
822
    enum ReadFrom {
823
        Input,
824
        Proc,
825
        Timeout,
826
    }
827

828
    const TIMEOUT: Duration = Duration::from_secs(5);
829
    let mut input_buf = [0; 512];
3✔
830
    let mut proc_buf = [0; 512];
3✔
831

832
    loop {
4✔
833
        let status = get_status(&s.session)?;
6✔
834
        if !matches!(status, Some(WaitStatus::StillAlive)) {
6✔
835
            s.status = status;
2✔
836
            return Ok(false);
2✔
837
        }
838

839
        let read_process = async { (ReadFrom::Proc, s.session.read(&mut proc_buf).await) };
3✔
840
        let read_input = async { (ReadFrom::Input, s.input.read(&mut input_buf).await) };
3✔
841
        let timeout = async { (ReadFrom::Timeout, async_timeout(TIMEOUT).await) };
1✔
842

843
        let read_any = future::or(read_process, read_input);
1✔
844
        let read_output = future::or(read_any, timeout).await;
2✔
845
        let read_target = read_output.0;
1✔
846
        let read_result = read_output.1;
1✔
847

848
        match read_target {
1✔
849
            ReadFrom::Proc => {
850
                let n = read_result?;
2✔
851
                let eof = n == 0;
1✔
852
                let buf = &proc_buf[..n];
2✔
853
                let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
1✔
854

855
                let exit = run_action_output(s, &buf, eof)?;
2✔
856

857
                if eof || exit {
2✔
NEW
858
                    return Ok(true);
×
859
                }
860

861
                s.output.write(&buf).await?;
2✔
862
                s.output.flush().await?;
3✔
863
            }
864
            ReadFrom::Input => {
865
                // We dont't print user input back to the screen.
866
                // In terminal mode it will be ECHOed back automatically.
867
                // This way we preserve terminal seetings for example when user inputs password.
868
                // The terminal must have been prepared before.
869
                match read_result {
1✔
870
                    Ok(n) => {
1✔
871
                        let eof = n == 0;
1✔
872
                        let buf = &input_buf[..n];
2✔
873
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
1✔
874

875
                        let exit = run_action_input(s, &buf, eof)?;
2✔
876

877
                        if eof || exit {
2✔
878
                            return Ok(true);
1✔
879
                        }
880

881
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
3✔
882
                        match escape_char_pos {
1✔
NEW
883
                            Some(pos) => {
×
NEW
884
                                s.session.write_all(&buf[..pos]).await?;
×
NEW
885
                                return Ok(true);
×
886
                            }
887
                            None => s.session.write_all(&buf[..]).await?,
4✔
888
                        }
889
                    }
890
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
3✔
NEW
891
                    Err(err) => return Err(err.into()),
×
892
                }
893
            }
894
            ReadFrom::Timeout => {
NEW
895
                let exit = run_action_idle(s, &[], false)?;
×
NEW
896
                if exit {
×
NEW
897
                    return Ok(true);
×
898
                }
899
            }
900
        }
901
    }
902
}
903

904

905
#[cfg(all(windows, feature = "async"))]
906
async fn interact_async<S, O, I, C>(s: &mut InteractSession<S, I, O, C>) -> Result<bool, Error>
907
where
908
    S: Healthcheck + AsyncRead + AsyncWrite + Unpin,
909
    I: AsyncRead + Unpin,
910
    O: AsyncWrite + Unpin,
911
{
912
    #[derive(Debug)]
913
    enum ReadFrom {
914
        Input,
915
        Proc,
916
        Timeout,
917
    }
918

919
    const TIMEOUT: Duration = Duration::from_secs(5);
920
    let mut input_buf = [0; 512];
921
    let mut proc_buf = [0; 512];
922

923
    loop {
924
        if !s.session.is_alive()? {
925
            return Ok(false);
926
        }
927

928
        let read_process = async { (ReadFrom::Proc, s.session.read(&mut proc_buf).await) };
929
        let read_input = async { (ReadFrom::Input, s.input.read(&mut input_buf).await) };
930
        let timeout = async { (ReadFrom::Timeout, async_timeout(TIMEOUT).await) };
931

932
        let read_any = future::or(read_process, read_input);
933
        let read_output = future::or(read_any, timeout).await;
934
        let read_target = read_output.0;
935
        let read_result = read_output.1;
936

937
        match read_target {
938
            ReadFrom::Proc => {
939
                let n = read_result?;
940
                let eof = n == 0;
941
                let buf = &proc_buf[..n];
942
                let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
943

944
                let exit = run_action_output(s, &buf, eof)?;
945

946
                if eof || exit {
947
                    return Ok(true);
948
                }
949

950
                s.output.write(&buf).await?;
951
                s.output.flush().await?;
952
            }
953
            ReadFrom::Input => {
954
                // We dont't print user input back to the screen.
955
                // In terminal mode it will be ECHOed back automatically.
956
                // This way we preserve terminal seetings for example when user inputs password.
957
                // The terminal must have been prepared before.
958
                match read_result {
959
                    Ok(n) => {
960
                        let eof = n == 0;
961
                        let buf = &input_buf[..n];
962
                        let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;
963

964
                        let exit = run_action_input(s, &buf, eof)?;
965

966
                        if eof || exit {
967
                            return Ok(true);
968
                        }
969

970
                        let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
971
                        match escape_char_pos {
972
                            Some(pos) => {
973
                                s.session.write_all(&buf[..pos]).await?;
974
                                return Ok(true);
975
                            }
976
                            None => s.session.write_all(&buf[..]).await?,
977
                        }
978
                    }
979
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
980
                    Err(err) => return Err(err.into()),
981
                }
982
            }
983
            ReadFrom::Timeout => {
984
                let exit = run_action_idle(s, &[], false)?;
985
                if exit {
986
                    return Ok(true);
987
                }
988
            }
989
        }
990
    }
991
}
992

993
#[cfg(feature = "async")]
NEW
994
async fn async_timeout(timeout: Duration) -> io::Result<usize> {
×
NEW
995
    Delay::new(timeout).await;
×
996
    io::Result::Ok(0)
997
}
998

999
fn spin_write<W>(mut writer: W, buf: &[u8]) -> std::io::Result<()>
1000
where
1001
    W: Write,
1002
{
1003
    loop {
1004
        match writer.write_all(buf) {
1005
            Ok(_) => return Ok(()),
1006
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
1007
            Err(_) => (),
1008
        }
1009
    }
1010
}
1011

1012
fn spin_flush<W>(mut writer: W) -> std::io::Result<()>
1013
where
1014
    W: Write,
1015
{
1016
    loop {
1017
        match writer.flush() {
1018
            Ok(_) => return Ok(()),
1019
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
1020
            Err(_) => (),
1021
        }
1022
    }
1023
}
1024

1025
#[rustfmt::skip]
1026
fn run_action_input<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
1✔
1027
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
1✔
1028
    opt_action(ctx, &mut s.opts.input_action)
1✔
1029
}
1030

1031
#[rustfmt::skip]
1032
fn run_action_output<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
1✔
1033
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
1✔
1034
    opt_action(ctx, &mut s.opts.output_action)
1✔
1035
}
1036

1037
#[rustfmt::skip]
NEW
1038
fn run_action_idle<S, I, O, C>(s: &mut InteractSession<S, I, O, C>, buf: &[u8], eof: bool) -> ExpectResult<bool> {
×
NEW
1039
    let ctx = Context::new(&mut s.session, &mut s.input, &mut s.output, &mut s.opts.state, &buf, eof);
×
NEW
1040
    opt_action(ctx, &mut s.opts.idle_action)
×
1041
}
1042

1043
fn opt_action<S, I, O, C>(
1✔
1044
    ctx: Context<'_, S, I, O, C>,
1045
    opt: &mut Option<OptAction<S, I, O, C>>,
1046
) -> ExpectResult<bool> {
1047
    match opt {
1✔
NEW
1048
        Some(action) => (action)(ctx),
×
1049
        None => Ok(false),
1050
    }
1051
}
1052

1053
fn call_filter<F>(filter: Option<F>, buf: &[u8]) -> Result<Cow<'_, [u8]>, Error>
1✔
1054
where
1055
    F: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
1056
{
1057
    match filter {
1✔
UNCOV
1058
        Some(mut action) => (action)(buf),
×
1059
        None => Ok(Cow::Borrowed(buf)),
1✔
1060
    }
1061
}
1062

1063
#[cfg(unix)]
1064
fn get_status<S>(session: &S) -> Result<Option<S::Status>, Error>
2✔
1065
where
1066
    S: Healthcheck,
1067
{
1068
    match session.get_status() {
2✔
1069
        Ok(status) => Ok(Some(status)),
2✔
NEW
1070
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
×
NEW
1071
        Err(err) => Err(Error::IO(err)),
×
1072
    }
1073
}
1074

1075
#[cfg(not(feature = "async"))]
1076
fn try_read<S>(session: &mut S, buf: &mut [u8]) -> ExpectResult<Option<usize>>
1077
where
1078
    S: NonBlocking + Read,
1079
{
1080
    session.set_blocking(false)?;
1081

1082
    let result = session.read(buf);
1083

1084
    session.set_blocking(true)?;
1085

1086
    match result {
1087
        Ok(n) => Ok(Some(n)),
1088
        Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(None),
1089
        Err(err) => Err(Error::IO(err)),
1090
    }
1091
}
1092

1093
fn get_pointer<T>(ptr: &Option<Box<T>>) -> usize
1094
where
1095
    T: ?Sized,
1096
{
1097
    ptr.as_ref().map_or(0, |f| std::ptr::addr_of!(f) as usize)
1098
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc