• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zhiburt / expectrl / 4343058004

pending completion
4343058004

push

github

GitHub
Change interact callbacks to be able to stop (#58)

12 of 12 new or added lines in 2 files covered. (100.0%)

1479 of 2567 relevant lines covered (57.62%)

3.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.72
/src/interact/session.rs
1
//! This module contains a [`InteractSession`] which runs an interact session with IO.
2

3
use std::{
4
    borrow::{BorrowMut, Cow},
5
    io::{ErrorKind, Write},
6
};
7

8
use crate::{session::Proc, Error, Session};
9

10
#[cfg(not(feature = "async"))]
11
use std::io::Read;
12

13
use super::{Context, InteractOptions};
14
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
15
use crate::process::NonBlocking;
16

17
/// InteractConfig represents options of an interactive session.
18
#[derive(Debug)]
19
pub struct InteractSession<Session, Input, Output> {
20
    session: Session,
21
    input: Input,
22
    output: Output,
23
    escape_character: u8,
24
    #[cfg(unix)]
25
    status: Option<crate::WaitStatus>,
26
}
27

28
impl<S, I, O> InteractSession<S, I, O> {
29
    /// Default escape character.
30
    pub const ESCAPE: u8 = 29; // Ctrl-]
31

32
    /// Creates a new object of [InteractSession].
33
    pub fn new(session: S, input: I, output: O) -> InteractSession<S, I, O> {
4✔
34
        InteractSession {
35
            input,
36
            output,
37
            session,
38
            escape_character: Self::ESCAPE,
39
            #[cfg(unix)]
40
            status: None,
41
        }
42
    }
43

44
    /// Sets an escape character after seen which the interact interactions will be stopped
45
    /// and controll will be returned to a caller process.
46
    pub fn set_escape_character(mut self, c: u8) -> Self {
×
47
        self.escape_character = c;
×
48
        self
×
49
    }
50

51
    /// Returns a status of spawned session if it was exited.
52
    ///
53
    /// If [`Self::spawn`] returns false but this method returns None it means that a child process was shutdown by various reasons.
54
    /// Which sometimes happens and it's not considered to be a valid [`WaitStatus`], so None is returned.
55
    ///
56
    /// [`Self::spawn`]: crate::interact::InteractSession::spawn
57
    /// [`WaitStatus`]: crate::WaitStatus
58
    #[cfg(unix)]
59
    pub fn get_status(&self) -> Option<crate::WaitStatus> {
×
60
        self.status
×
61
    }
62
}
63

64
#[cfg(not(any(feature = "async", feature = "polling")))]
65
impl<S, I, O> InteractSession<&mut Session<Proc, S>, I, O>
66
where
67
    I: Read,
68
    O: Write,
69
    S: NonBlocking + Write + Read,
70
{
71
    /// Runs the session.
72
    ///
73
    /// See [`Session::interact`].
74
    ///
75
    /// [`Session::interact`]: crate::session::Session::interact
76
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
7✔
77
    where
78
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
79
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
80
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
81
        IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
82
        OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
83
        WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
84
    {
85
        #[cfg(unix)]
86
        {
87
            let is_echo = self
14✔
88
                .session
×
89
                .get_echo()
90
                .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
91
            if !is_echo {
7✔
92
                let _ = self.session.set_echo(true, None);
7✔
93
            }
94

95
            self.status = None;
7✔
96
            let is_alive = interact_buzy_loop(self, ops.borrow_mut())?;
14✔
97

98
            if !is_echo {
7✔
99
                let _ = self.session.set_echo(false, None);
7✔
100
            }
101

102
            Ok(is_alive)
7✔
103
        }
104

105
        #[cfg(windows)]
106
        {
107
            interact_buzy_loop(self, ops.borrow_mut())
×
108
        }
109
    }
110
}
111

112
#[cfg(all(unix, feature = "polling", not(feature = "async")))]
113
impl<S, I, O> InteractSession<&mut Session<Proc, S>, I, O>
114
where
115
    I: Read + std::os::unix::io::AsRawFd,
116
    O: Write,
117
    S: Write + Read + std::os::unix::io::AsRawFd,
118
{
119
    /// Runs the session.
120
    ///
121
    /// See [`Session::interact`].
122
    ///
123
    /// [`Session::interact`]: crate::session::Session::interact
124
    pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
125
    where
126
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
127
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
128
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
129
        IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
130
        OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
131
        WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
132
    {
133
        let is_echo = self
×
134
            .session
×
135
            .get_echo()
136
            .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
137
        if !is_echo {
×
138
            let _ = self.session.set_echo(true, None);
×
139
        }
140

141
        self.status = None;
×
142
        let is_alive = interact_polling(self, ops.borrow_mut())?;
×
143

144
        if !is_echo {
×
145
            let _ = self.session.set_echo(false, None);
×
146
        }
147

148
        Ok(is_alive)
×
149
    }
150
}
151

152
#[cfg(feature = "async")]
153
impl<S, I, O> InteractSession<&mut Session<Proc, S>, I, O>
154
where
155
    I: futures_lite::AsyncRead + Unpin,
156
    O: Write,
157
    S: futures_lite::AsyncRead + futures_lite::AsyncWrite + Unpin,
158
{
159
    /// Runs the session.
160
    ///
161
    /// See [`Session::interact`].
162
    ///
163
    /// [`Session::interact`]: crate::session::Session::interact
164
    pub async fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut opts: OPS) -> Result<bool, Error>
165
    where
166
        OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
167
        IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
168
        OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
169
        IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
170
        OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
171
        WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
172
    {
173
        #[cfg(unix)]
174
        {
175
            let is_echo = self
×
176
                .session
×
177
                .get_echo()
178
                .map_err(|e| Error::unknown("failed to get echo", e.to_string()))?;
×
179
            if !is_echo {
×
180
                let _ = self.session.set_echo(true, None);
×
181
            }
182

183
            let is_alive = interact_async(self, opts.borrow_mut()).await?;
×
184

185
            if !is_echo {
×
186
                let _ = self.session.set_echo(false, None);
×
187
            }
188

189
            Ok(is_alive)
×
190
        }
191

192
        #[cfg(windows)]
193
        {
194
            interact_async(self, opts.borrow_mut()).await
×
195
        }
196
    }
197
}
198

199
#[cfg(all(windows, feature = "polling", not(feature = "async")))]
200
impl<S, I, O, C> InteractSession<Session<Proc, S>, I, O, C>
201
where
202
    I: Read + Send + 'static,
203
    O: Write,
204
    S: Write + Read + std::os::unix::io::AsRawFd,
205
{
206
    /// Runs the session.
207
    ///
208
    /// See [`Session::interact`].
209
    ///
210
    /// [`Session::interact`]: crate::session::Session::interact
211
    pub fn spawn(mut self) -> Result<bool, Error> {
×
212
        interact_polling_on_thread(
213
            self.session,
×
214
            self.output,
×
215
            self.input,
×
216
            &mut self.state,
×
217
            self.escape_character,
×
218
            self.input_filter,
×
219
            self.output_filter,
×
220
            self.input_action,
×
221
            self.output_action,
×
222
            self.idle_action,
×
223
        )?;
224

225
        Ok(self.state)
×
226
    }
227
}
228

229
#[cfg(all(not(feature = "async"), not(feature = "polling")))]
230
fn interact_buzy_loop<S, O, I, C, IF, OF, IA, OA, WA>(
7✔
231
    interact: &mut InteractSession<&mut Session<Proc, S>, I, O>,
232
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
233
) -> Result<bool, Error>
234
where
235
    S: NonBlocking + Write + Read,
236
    I: Read,
237
    O: Write,
238
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
239
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
240
    IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
241
    OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
242
    WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
243
{
244
    let mut buf = [0; 512];
7✔
245
    loop {
246
        #[cfg(unix)]
247
        {
248
            let status = get_status(interact.session)?;
7✔
249
            if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
13✔
250
                interact.status = status;
3✔
251
                return Ok(false);
3✔
252
            }
253
        }
254

255
        #[cfg(windows)]
256
        {
257
            if !interact.session.is_alive() {
258
                return Ok(false);
259
            }
260
        }
261

262
        match interact.session.try_read(&mut buf) {
4✔
263
            Ok(n) => {
4✔
264
                let eof = n == 0;
4✔
265
                let buf = &buf[..n];
8✔
266
                let buf = call_filter(opts.output_filter.as_mut(), buf)?;
4✔
267

268
                let exit = call_action(
269
                    opts.output_action.as_mut(),
4✔
270
                    interact.session,
4✔
271
                    &mut interact.input,
4✔
272
                    &mut interact.output,
4✔
273
                    &mut opts.state,
×
274
                    &buf,
4✔
275
                    eof,
276
                )?;
277

278
                if eof || exit {
8✔
279
                    return Ok(true);
×
280
                }
281

282
                spin_write(&mut interact.output, &buf)?;
8✔
283
                spin_flush(&mut interact.output)?;
8✔
284
            }
285
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
12✔
286
            Err(err) => return Err(err.into()),
×
287
        }
288

289
        // We dont't print user input back to the screen.
290
        // In terminal mode it will be ECHOed back automatically.
291
        // This way we preserve terminal seetings for example when user inputs password.
292
        // The terminal must have been prepared before.
293
        match interact.input.read(&mut buf) {
4✔
294
            Ok(n) => {
4✔
295
                let eof = n == 0;
4✔
296
                let buf = &buf[..n];
8✔
297
                let buf = call_filter(opts.input_filter.as_mut(), buf)?;
4✔
298

299
                let exit = call_action(
300
                    opts.input_action.as_mut(),
4✔
301
                    interact.session,
4✔
302
                    &mut interact.input,
4✔
303
                    &mut interact.output,
4✔
304
                    &mut opts.state,
×
305
                    &buf,
4✔
306
                    eof,
307
                )?;
308

309
                if eof | exit {
4✔
310
                    return Ok(true);
3✔
311
                }
312

313
                let escape_char_position = buf.iter().position(|c| *c == interact.escape_character);
16✔
314
                match escape_char_position {
4✔
315
                    Some(pos) => {
1✔
316
                        interact.session.write_all(&buf[..pos])?;
1✔
317
                        return Ok(true);
1✔
318
                    }
319
                    None => {
320
                        interact.session.write_all(&buf[..])?;
8✔
321
                    }
322
                }
323
            }
324
            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
9✔
325
            Err(err) => return Err(err.into()),
×
326
        }
327

328
        let exit = call_action(
329
            opts.idle_action.as_mut(),
4✔
330
            interact.session,
4✔
331
            &mut interact.input,
4✔
332
            &mut interact.output,
4✔
333
            &mut opts.state,
×
334
            &[],
335
            false,
336
        )?;
337

338
        if exit {
4✔
339
            return Ok(true);
×
340
        }
341
    }
342
}
343

344
#[cfg(all(unix, not(feature = "async"), feature = "polling"))]
345
fn interact_polling<S, O, I, C, IF, OF, IA, OA, WA>(
346
    interact: &mut InteractSession<&mut Session<Proc, S>, I, O>,
347
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
348
) -> Result<bool, Error>
349
where
350
    S: Write + Read + std::os::unix::io::AsRawFd,
351
    I: Read + std::os::unix::io::AsRawFd,
352
    O: Write,
353
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
354
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
355
    IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
356
    OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
357
    WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
358
{
359
    use polling::{Event, Poller};
360

361
    // Create a poller and register interest in readability on the socket.
362
    let poller = Poller::new()?;
363
    poller.add(interact.input.as_raw_fd(), Event::readable(0))?;
364
    poller.add(
365
        interact.session.get_stream().as_raw_fd(),
366
        Event::readable(1),
367
    )?;
368

369
    let mut buf = [0; 512];
370

371
    // The event loop.
372
    let mut events = Vec::new();
373
    loop {
374
        let status = get_status(interact.session)?;
375
        if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
376
            interact.status = status;
377
            return Ok(false);
378
        }
379

380
        // Wait for at least one I/O event.
381
        events.clear();
382
        let _ = poller.wait(&mut events, Some(std::time::Duration::from_secs(5)))?;
383

384
        for ev in &events {
385
            if ev.key == 0 {
386
                // We dont't print user input back to the screen.
387
                // In terminal mode it will be ECHOed back automatically.
388
                // This way we preserve terminal seetings for example when user inputs password.
389
                // The terminal must have been prepared before.
390
                match interact.input.read(&mut buf) {
391
                    Ok(n) => {
392
                        let eof = n == 0;
393
                        let buf = &buf[..n];
394
                        let buf = call_filter(opts.input_filter.as_mut(), buf)?;
395

396
                        call_action(
397
                            opts.input_action.as_mut(),
398
                            interact.session,
399
                            &mut interact.input,
400
                            &mut interact.output,
401
                            &mut opts.state,
402
                            &buf,
403
                            eof,
404
                        )?;
405

406
                        if eof {
407
                            return Ok(true);
408
                        }
409

410
                        let escape_char_pos =
411
                            buf.iter().position(|c| *c == interact.escape_character);
412
                        match escape_char_pos {
413
                            Some(pos) => {
414
                                interact.session.write_all(&buf[..pos]).map_err(Error::IO)?;
415
                                return Ok(true);
416
                            }
417
                            None => interact.session.write_all(&buf[..])?,
418
                        }
419
                    }
420
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
421
                    Err(err) => return Err(err.into()),
422
                }
423

424
                // Set interest in the next readability event.
425
                poller.modify(interact.input.as_raw_fd(), Event::readable(0))?;
426
            }
427

428
            if ev.key == 1 {
429
                match interact.session.read(&mut buf) {
430
                    Ok(n) => {
431
                        let eof = n == 0;
432
                        let buf = &buf[..n];
433
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
434

435
                        call_action(
436
                            opts.output_action.as_mut(),
437
                            interact.session,
438
                            &mut interact.input,
439
                            &mut interact.output,
440
                            &mut opts.state,
441
                            &buf,
442
                            eof,
443
                        )?;
444

445
                        if eof {
446
                            return Ok(true);
447
                        }
448

449
                        spin_write(&mut interact.output, &buf)?;
450
                        spin_flush(&mut interact.output)?;
451
                    }
452
                    Err(err) if err.kind() == ErrorKind::WouldBlock => {}
453
                    Err(err) => return Err(err.into()),
454
                }
455

456
                // Set interest in the next readability event.
457
                poller.modify(
458
                    interact.session.get_stream().as_raw_fd(),
459
                    Event::readable(1),
460
                )?;
461
            }
462
        }
463

464
        call_action(
465
            opts.idle_action.as_mut(),
466
            interact.session,
467
            &mut interact.input,
468
            &mut interact.output,
469
            &mut opts.state,
470
            &[],
471
            false,
472
        )?;
473
    }
474
}
475

476
#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
477
fn interact_polling_on_thread<
478
    State,
479
    Output,
480
    Input,
481
    InputFilter,
482
    OutputFilter,
483
    InputAction,
484
    OutputAction,
485
    IdleAction,
486
>(
487
    session: &mut Session,
488
    mut output: Output,
489
    input: Input,
490
    state: &mut State,
491
    escape_character: u8,
492
    mut input_filter: Option<InputFilter>,
493
    mut output_filter: Option<OutputFilter>,
494
    mut input_action: Option<InputAction>,
495
    mut output_action: Option<OutputAction>,
496
    mut idle_action: Option<IdleAction>,
497
) -> Result<bool, Error>
498
where
499
    Input: Read + Send + 'static,
500
    Output: Write,
501
    InputFilter: FnMut(&[u8]) -> Result<Cow<[u8]>, Error>,
502
    OutputFilter: FnMut(&[u8]) -> Result<Cow<[u8]>, Error>,
503
    InputAction: FnMut(Context<'_, &mut Session, &mut Output, &mut State>) -> Result<(), Error>,
504
    OutputAction: FnMut(Context<'_, &mut Session, &mut Output, &mut State>) -> Result<(), Error>,
505
    IdleAction: FnMut(Context<'_, &mut Session, &mut Output, &mut State>) -> Result<(), Error>,
506
{
507
    use crate::{
508
        error::to_io_error,
509
        waiter::{Recv, Wait2},
510
    };
511

512
    // Create a poller and register interest in readability on the socket.
513
    let stream = session.get_stream().try_clone().map_err(to_io_error(""))?;
514
    let mut poller = Wait2::new(input, stream);
515

516
    loop {
517
        // In case where proceses exits we are trying to
518
        // fill buffer to run callbacks if there was something in.
519
        //
520
        // We ignore errors because there might be errors like EOCHILD etc.
521
        let status = session.is_alive();
522
        if matches!(status, Ok(false)) {
523
            return Ok(false);
524
        }
525

526
        // Wait for at least one I/O event.
527
        let event = poller.recv().map_err(to_io_error(""))?;
528
        match event {
529
            Recv::R1(b) => match b {
530
                Ok(b) => {
531
                    let eof = b.is_none();
532
                    let n = if eof { 0 } else { 1 };
533
                    let buf = b.map_or([0], |b| [b]);
534
                    let buf = &buf[..n];
535

536
                    let buf = if let Some(filter) = input_filter.as_mut() {
537
                        (filter)(buf)?
538
                    } else {
539
                        Cow::Borrowed(buf)
540
                    };
541

542
                    if let Some(action) = input_action.as_mut() {
543
                        let ctx = Context::new(&mut *session, &mut output, &buf, eof, &mut *state);
544
                        (action)(ctx)?;
545
                    }
546

547
                    if eof {
548
                        return Ok(true);
549
                    }
550

551
                    let escape_char_pos = buf.iter().position(|c| *c == escape_character);
552
                    match escape_char_pos {
553
                        Some(pos) => {
554
                            session.write_all(&buf[..pos])?;
555
                            return Ok(true);
556
                        }
557
                        None => session.write_all(&buf[..])?,
558
                    }
559
                }
560
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
561
                Err(err) => return Err(err.into()),
562
            },
563
            Recv::R2(b) => match b {
564
                Ok(b) => {
565
                    let eof = b.is_none();
566
                    let n = if eof { 0 } else { 1 };
567
                    let buf = b.map_or([0], |b| [b]);
568
                    let buf = &buf[..n];
569

570
                    let buf = if let Some(filter) = output_filter.as_mut() {
571
                        (filter)(buf)?
572
                    } else {
573
                        Cow::Borrowed(buf)
574
                    };
575

576
                    if let Some(action) = output_action.as_mut() {
577
                        let ctx = Context::new(&mut *session, &mut output, &buf, eof, &mut *state);
578
                        (action)(ctx)?;
579
                    }
580

581
                    if eof {
582
                        return Ok(true);
583
                    }
584

585
                    output.write_all(&buf)?;
586
                    output.flush()?;
587
                }
588
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
589
                Err(err) => return Err(err.into()),
590
            },
591
            Recv::Timeout => {
592
                if let Some(action) = idle_action.as_mut() {
593
                    let ctx = Context::new(&mut *session, &mut output, &[], false, &mut *state);
594
                    (action)(ctx)?;
595
                }
596
            }
597
        }
598
    }
599
}
600

601
#[cfg(feature = "async")]
602
async fn interact_async<S, O, I, C, IF, OF, IA, OA, WA>(
603
    interact: &mut InteractSession<&mut Session<Proc, S>, I, O>,
604
    opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
605
) -> Result<bool, Error>
606
where
607
    S: futures_lite::AsyncRead + futures_lite::AsyncWrite + Unpin,
608
    I: futures_lite::AsyncRead + Unpin,
609
    O: Write,
610
    IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
611
    OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
612
    IA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
613
    OA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
614
    WA: FnMut(Context<'_, Session<Proc, S>, I, O, C>) -> Result<bool, Error>,
615
{
616
    use std::io;
617

618
    use futures_lite::{AsyncReadExt, AsyncWriteExt};
619

620
    let mut stdin_buf = [0; 512];
621
    let mut proc_buf = [0; 512];
622
    loop {
623
        #[cfg(unix)]
624
        {
625
            let status = get_status(interact.session)?;
626
            if !matches!(status, Some(crate::WaitStatus::StillAlive)) {
627
                interact.status = status;
628
                return Ok(false);
629
            }
630
        }
631

632
        #[cfg(windows)]
633
        {
634
            if !interact.session.is_alive() {
635
                return Ok(false);
636
            }
637
        }
638

639
        #[derive(Debug)]
640
        enum ReadFrom {
641
            Stdin,
642
            Process,
643
            Timeout,
644
        }
645

646
        let read_process = async {
647
            (
648
                ReadFrom::Process,
649
                interact.session.read(&mut proc_buf).await,
650
            )
651
        };
652
        let read_stdin = async { (ReadFrom::Stdin, interact.input.read(&mut stdin_buf).await) };
653
        let timeout = async {
654
            (
655
                ReadFrom::Timeout,
656
                async {
657
                    futures_timer::Delay::new(std::time::Duration::from_secs(5)).await;
658
                    io::Result::Ok(0)
659
                }
660
                .await,
661
            )
662
        };
663

664
        let read_fut = futures_lite::future::or(read_process, read_stdin);
665
        let (read_from, result) = futures_lite::future::or(read_fut, timeout).await;
666

667
        match read_from {
668
            ReadFrom::Process => {
669
                let n = result?;
670
                let eof = n == 0;
671
                let buf = &proc_buf[..n];
672
                let buf = call_filter(opts.output_filter.as_mut(), buf)?;
673

674
                call_action(
675
                    opts.output_action.as_mut(),
676
                    interact.session,
677
                    &mut interact.input,
678
                    &mut interact.output,
679
                    &mut opts.state,
680
                    &buf,
681
                    eof,
682
                )?;
683

684
                if eof {
685
                    return Ok(true);
686
                }
687

688
                spin_write(&mut interact.output, &buf)?;
689
                spin_flush(&mut interact.output)?;
690
            }
691
            ReadFrom::Stdin => {
692
                // We dont't print user input back to the screen.
693
                // In terminal mode it will be ECHOed back automatically.
694
                // This way we preserve terminal seetings for example when user inputs password.
695
                // The terminal must have been prepared before.
696
                match result {
697
                    Ok(n) => {
698
                        let eof = n == 0;
699
                        let buf = &stdin_buf[..n];
700
                        let buf = call_filter(opts.output_filter.as_mut(), buf)?;
701

702
                        call_action(
703
                            opts.input_action.as_mut(),
704
                            interact.session,
705
                            &mut interact.input,
706
                            &mut interact.output,
707
                            &mut opts.state,
708
                            &buf,
709
                            eof,
710
                        )?;
711

712
                        if eof {
713
                            return Ok(true);
714
                        }
715

716
                        let escape_char_pos =
717
                            buf.iter().position(|c| *c == interact.escape_character);
718
                        match escape_char_pos {
719
                            Some(pos) => {
720
                                interact.session.write_all(&buf[..pos]).await?;
721
                                return Ok(true);
722
                            }
723
                            None => interact.session.write_all(&buf[..]).await?,
724
                        }
725
                    }
726
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
727
                    Err(err) => return Err(err.into()),
728
                }
729
            }
730
            ReadFrom::Timeout => {
731
                call_action(
732
                    opts.idle_action.as_mut(),
733
                    interact.session,
734
                    &mut interact.input,
735
                    &mut interact.output,
736
                    &mut opts.state,
737
                    &[],
738
                    false,
739
                )?;
740

741
                // We need to check whether a process is alive;
742
                continue;
743
            }
744
        }
745
    }
746
}
747

748
fn spin_write<W>(mut writer: W, buf: &[u8]) -> std::io::Result<()>
1✔
749
where
750
    W: Write,
751
{
752
    loop {
×
753
        match writer.write_all(buf) {
2✔
754
            Ok(_) => return Ok(()),
1✔
755
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
756
            Err(_) => (),
757
        }
758
    }
759
}
760

761
fn spin_flush<W>(mut writer: W) -> std::io::Result<()>
1✔
762
where
763
    W: Write,
764
{
765
    loop {
×
766
        match writer.flush() {
2✔
767
            Ok(_) => return Ok(()),
1✔
768
            Err(err) if err.kind() != std::io::ErrorKind::WouldBlock => return Err(err),
×
769
            Err(_) => (),
770
        }
771
    }
772
}
773

774
fn call_action<F, S, I, O, C>(
9✔
775
    action: Option<F>,
776
    s: &mut S,
777
    r: &mut I,
778
    w: &mut O,
779
    state: &mut C,
780
    buf: &[u8],
781
    eof: bool,
782
) -> Result<bool, Error>
783
where
784
    F: FnMut(Context<'_, S, I, O, C>) -> Result<bool, Error>,
785
{
786
    match action {
9✔
787
        Some(mut action) => (action)(Context::new(s, r, w, state, buf, eof)),
12✔
788
        None => Ok(false),
789
    }
790
}
791

792
fn call_filter<F>(filter: Option<F>, buf: &[u8]) -> Result<Cow<'_, [u8]>, Error>
3✔
793
where
794
    F: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
795
{
796
    match filter {
3✔
797
        Some(mut action) => (action)(buf),
4✔
798
        None => Ok(Cow::Borrowed(buf)),
1✔
799
    }
800
}
801

802
#[cfg(unix)]
803
fn get_status<S>(session: &Session<Proc, S>) -> Result<Option<crate::WaitStatus>, Error> {
2✔
804
    match session.status() {
3✔
805
        Ok(status) => Ok(Some(status)),
2✔
806
        Err(ptyprocess::errno::Errno::ECHILD | ptyprocess::errno::Errno::ESRCH) => Ok(None),
1✔
807
        Err(err) => Err(Error::IO(std::io::Error::new(ErrorKind::Other, err))),
×
808
    }
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc