• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 14667842019

25 Apr 2025 03:14PM UTC coverage: 73.861% (-0.03%) from 73.889%
14667842019

push

github

tesuji
use log.exception inside except block

3827 of 6452 branches covered (59.31%)

0 of 1 new or added line in 1 file covered. (0.0%)

5 existing lines in 2 files now uncovered.

13377 of 18111 relevant lines covered (73.86%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.11
/pwnlib/tubes/tube.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4

5
import abc
1✔
6
import logging
1✔
7
import os
1✔
8
import re
1✔
9
import string
1✔
10
import subprocess
1✔
11
import sys
1✔
12
import threading
1✔
13
import time
1✔
14

15
from pwnlib import atexit
1✔
16
from pwnlib import term
1✔
17
from pwnlib.context import context
1✔
18
from pwnlib.log import Logger
1✔
19
from pwnlib.timeout import Timeout
1✔
20
from pwnlib.tubes.buffer import Buffer
1✔
21
from pwnlib.util import fiddling
1✔
22
from pwnlib.util import iters
1✔
23
from pwnlib.util import misc
1✔
24
from pwnlib.util import packing
1✔
25

26

27
class tube(Timeout, Logger):
1✔
28
    """
29
    Container of all the tube functions common to sockets, TTYs and SSH connetions.
30
    """
31

32
    default = Timeout.default
1✔
33
    forever = Timeout.forever
1✔
34

35
    def __init__(self, timeout = default, level = None, *a, **kw):
1✔
36
        super(tube, self).__init__(timeout)
1✔
37

38
        Logger.__init__(self, None)
1✔
39
        if level is not None:
1✔
40
            self.setLevel(level)
1✔
41

42
        self.buffer = Buffer(*a, **kw)
1✔
43
        self._newline = None
1✔
44
        atexit.register(self.close)
1✔
45

46
    def _normalize_keepends_drop(self, keepends, drop, drop_default):
1✔
47
        '''
48
        >>> t = tube()
49
        >>> t._normalize_keepends_drop(None, None, True)
50
        True
51
        >>> t._normalize_keepends_drop(None, None, False)
52
        False
53
        >>> t._normalize_keepends_drop(None, True, True)
54
        True
55
        >>> t._normalize_keepends_drop(None, True, False)
56
        True
57
        >>> t._normalize_keepends_drop(True, None, True)
58
        False
59
        >>> t._normalize_keepends_drop(True, None, False)
60
        False
61
        >>> t._normalize_keepends_drop(None, False, True)
62
        False
63
        >>> t._normalize_keepends_drop(None, False, False)
64
        False
65
        >>> t._normalize_keepends_drop(False, None, True)
66
        True
67
        >>> t._normalize_keepends_drop(False, None, False)
68
        True
69
        >>> t._normalize_keepends_drop(False, True, False)
70
        Traceback (most recent call last):
71
            ...
72
        pwnlib.exception.PwnlibException: 'drop' and 'keepends' arguments cannot be used together.
73
        '''
74
        if keepends is not None:
1✔
75
            self.warn_once("'keepends' argument is deprecated. Use 'drop' instead.")
1✔
76
        if drop is None and keepends is None:
1✔
77
            return drop_default
1✔
78
        elif drop is not None:
1✔
79
            if keepends is not None:
1✔
80
                self.error("'drop' and 'keepends' arguments cannot be used together.")
1✔
81
            return drop
1✔
82
        return not keepends
1✔
83

84
    @property
1✔
85
    def newline(self):
1✔
86
        r'''Character sent with methods like sendline() or used for recvline().
87

88
            >>> t = tube()
89
            >>> t.newline = b'X'
90
            >>> t.unrecv(b'A\nB\nCX')
91
            >>> t.recvline()
92
            b'A\nB\nCX'
93

94
            >>> t = tube()
95
            >>> context.newline = b'\r\n'
96
            >>> t.newline
97
            b'\r\n'
98

99
            # Clean up
100
            >>> context.clear()
101
        '''
102
        if self._newline is not None:
1✔
103
            return self._newline
1✔
104
        return context.newline
1✔
105

106
    @newline.setter
1✔
107
    def newline(self, newline):
1✔
108
        self._newline = packing._need_bytes(newline)
1✔
109

110
    # Functions based on functions from subclasses
111
    def recv(self, numb = None, timeout = default):
1✔
112
        r"""recv(numb = 4096, timeout = default) -> bytes
113

114
        Receives up to `numb` bytes of data from the tube, and returns
115
        as soon as any quantity of data is available.
116

117
        If the request is not satisfied before ``timeout`` seconds pass,
118
        all data is buffered and an empty string (``''``) is returned.
119

120
        Raises:
121
            exceptions.EOFError: The connection is closed
122

123
        Returns:
124
            A bytes object containing bytes received from the socket,
125
            or ``''`` if a timeout occurred while waiting.
126

127
        Examples:
128

129
            >>> t = tube()
130
            >>> # Fake a data source
131
            >>> t.recv_raw = lambda n: b'Hello, world'
132
            >>> t.recv() == b'Hello, world'
133
            True
134
            >>> t.unrecv(b'Woohoo')
135
            >>> t.recv() == b'Woohoo'
136
            True
137
            >>> with context.local(log_level='debug'):
138
            ...    _ = t.recv()
139
            [...] Received 0xc bytes:
140
                b'Hello, world'
141
        """
142
        numb = self.buffer.get_fill_size(numb)
1✔
143
        return self._recv(numb, timeout) or b''
1✔
144

145
    def unrecv(self, data):
1✔
146
        """unrecv(data)
147

148
        Puts the specified data back at the beginning of the receive
149
        buffer.
150

151
        Examples:
152

153
            >>> t = tube()
154
            >>> t.recv_raw = lambda n: b'hello'
155
            >>> t.recv()
156
            b'hello'
157
            >>> t.recv()
158
            b'hello'
159
            >>> t.unrecv(b'world')
160
            >>> t.recv()
161
            b'world'
162
            >>> t.recv()
163
            b'hello'
164
        """
165
        data = packing._need_bytes(data)
1✔
166
        self.buffer.unget(data)
1✔
167

168
    def _fillbuffer(self, timeout = default):
1✔
169
        """_fillbuffer(timeout = default)
170

171
        Fills the internal buffer from the pipe, by calling
172
        :meth:`recv_raw` exactly once.
173

174
        Returns:
175

176
            The bytes of data received, or ``''`` if no data was received.
177

178
        Examples:
179

180
            >>> t = tube()
181
            >>> t.recv_raw = lambda *a: b'abc'
182
            >>> len(t.buffer)
183
            0
184
            >>> t._fillbuffer()
185
            b'abc'
186
            >>> len(t.buffer)
187
            3
188
        """
189
        data = b''
1✔
190

191
        with self.local(timeout):
1✔
192
            data = self.recv_raw(self.buffer.get_fill_size())
1✔
193

194
        if data and self.isEnabledFor(logging.DEBUG):
1✔
195
            self.debug('Received %#x bytes:' % len(data))
1✔
196
            self.maybe_hexdump(data, level=logging.DEBUG)
1✔
197
        if data:
1✔
198
            self.buffer.add(data)
1✔
199

200
        return data
1✔
201

202

203
    def _recv(self, numb = None, timeout = default):
1✔
204
        """_recv(numb = 4096, timeout = default) -> str
205

206
        Receives one chunk of from the internal buffer or from the OS if the
207
        buffer is empty.
208
        """
209
        numb = self.buffer.get_fill_size(numb)
1✔
210

211
        # No buffered data, could not put anything in the buffer
212
        # before timeout.
213
        if not self.buffer and not self._fillbuffer(timeout):
1✔
214
            return b''
1✔
215

216
        return self.buffer.get(numb)
1✔
217

218
    def recvpred(self, pred, timeout = default):
1✔
219
        """recvpred(pred, timeout = default) -> bytes
220

221
        Receives one byte at a time from the tube, until ``pred(all_bytes)``
222
        evaluates to True.
223

224
        If the request is not satisfied before ``timeout`` seconds pass,
225
        all data is buffered and an empty string (``''``) is returned.
226

227
        Arguments:
228
            pred(callable): Function to call, with the currently-accumulated data.
229
            timeout(int): Timeout for the operation
230

231
        Raises:
232
            exceptions.EOFError: The connection is closed
233

234
        Returns:
235
            A bytes object containing bytes received from the socket,
236
            or ``''`` if a timeout occurred while waiting.
237

238
        Examples:
239

240
            >>> t = tube()
241
            >>> t.recv_raw = lambda n: b'abbbaccc'
242
            >>> pred = lambda p: p.count(b'a') == 2
243
            >>> t.recvpred(pred)
244
            b'abbba'
245
            >>> pred = lambda p: p.count(b'd') > 0
246
            >>> t.recvpred(pred, timeout=0.05)
247
            b''
248
        """
249

250
        data = b''
1✔
251

252
        with self.countdown(timeout):
1✔
253
            while not pred(data):
1✔
254
                if not self.countdown_active():
1✔
255
                    self.unrecv(data)
1✔
256
                    return b''
1✔
257

258
                try:
1✔
259
                    res = self.recv(1, timeout=timeout)
1✔
260
                except Exception:
×
261
                    self.unrecv(data)
×
262
                    return b''
×
263

264
                if res:
1!
265
                    data += res
1✔
266
                else:
267
                    self.unrecv(data)
×
268
                    return b''
×
269

270
        return data
1✔
271

272
    def recvn(self, numb, timeout = default):
1✔
273
        """recvn(numb, timeout = default) -> bytes
274

275
        Receives exactly `n` bytes.
276

277
        If the request is not satisfied before ``timeout`` seconds pass,
278
        all data is buffered and an empty string (``''``) is returned.
279

280
        Raises:
281
            exceptions.EOFError: The connection closed before the request could be satisfied
282

283
        Returns:
284
            A string containing bytes received from the socket,
285
            or ``''`` if a timeout occurred while waiting.
286

287
        Examples:
288

289
            >>> t = tube()
290
            >>> data = b'hello world'
291
            >>> t.recv_raw = lambda *a: data
292
            >>> t.recvn(len(data)) == data
293
            True
294
            >>> t.recvn(len(data)+1) == data + data[:1]
295
            True
296
            >>> t.recv_raw = lambda *a: None
297
            >>> # The remaining data is buffered
298
            >>> t.recv() == data[1:]
299
            True
300
            >>> t.recv_raw = lambda *a: time.sleep(0.01) or b'a'
301
            >>> t.recvn(10, timeout=0.05)
302
            b''
303
            >>> t.recvn(10, timeout=0.06)
304
            b'aaaaaa...'
305
        """
306
        # Keep track of how much data has been received
307
        # It will be pasted together at the end if a
308
        # timeout does not occur, or put into the tube buffer.
309
        with self.countdown(timeout):
1✔
310
            while self.countdown_active() and len(self.buffer) < numb and self._fillbuffer(self.timeout):
1✔
311
                pass
1✔
312

313
        if len(self.buffer) < numb:
1✔
314
            return b''
1✔
315

316
        return self.buffer.get(numb)
1✔
317

318
    def recvuntil(self, delims, drop=False, timeout=default):
1✔
319
        """recvuntil(delims, drop=False, timeout=default) -> bytes
320

321
        Receive data until one of `delims` is encountered.
322

323
        If the request is not satisfied before ``timeout`` seconds pass,
324
        all data is buffered and an empty string (``''``) is returned.
325

326
        arguments:
327
            delims(bytes,tuple): Byte-string of delimiters characters, or list of delimiter byte-strings.
328
            drop(bool): Drop the ending.  If :const:`True` it is removed from the end of the return value.
329

330
        Raises:
331
            exceptions.EOFError: The connection closed before the request could be satisfied
332

333
        Returns:
334
            A string containing bytes received from the socket,
335
            or ``''`` if a timeout occurred while waiting.
336

337
        Examples:
338

339
            >>> t = tube()
340
            >>> t.recv_raw = lambda n: b"Hello World!"
341
            >>> t.recvuntil(b' ')
342
            b'Hello '
343
            >>> _=t.clean(0)
344
            >>> # Matches on 'o' in 'Hello'
345
            >>> t.recvuntil((b' ',b'W',b'o',b'r'))
346
            b'Hello'
347
            >>> _=t.clean(0)
348
            >>> # Matches expressly full string
349
            >>> t.recvuntil(b' Wor')
350
            b'Hello Wor'
351
            >>> _=t.clean(0)
352
            >>> # Matches on full string, drops match
353
            >>> t.recvuntil(b' Wor', drop=True)
354
            b'Hello'
355

356
            >>> # Try with regex special characters
357
            >>> t = tube()
358
            >>> t.recv_raw = lambda n: b"Hello|World"
359
            >>> t.recvuntil(b'|', drop=True)
360
            b'Hello'
361

362
        """
363
        # Convert string into singleton tupple
364
        if isinstance(delims, (bytes, bytearray, str)):
1✔
365
            delims = (delims,)
1✔
366
        delims = tuple(map(packing._need_bytes, delims))
1✔
367

368
        # Longest delimiter for tracking purposes
369
        longest = max(map(len, delims))
1✔
370

371
        # Cumulative data to search
372
        data = []
1✔
373
        top = b''
1✔
374

375
        with self.countdown(timeout):
1✔
376
            while self.countdown_active():
1!
377
                try:
1✔
378
                    res = self.recv(timeout=self.timeout)
1✔
379
                except Exception:
1✔
380
                    self.unrecv(b''.join(data) + top)
1✔
381
                    raise
1✔
382

383
                if not res:
1✔
384
                    self.unrecv(b''.join(data) + top)
1✔
385
                    return b''
1✔
386

387
                top += res
1✔
388
                start = len(top)
1✔
389
                for d in delims:
1✔
390
                    j = top.find(d)
1✔
391
                    if start > j > -1:
1✔
392
                        start = j
1✔
393
                        end = j + len(d)
1✔
394
                if start < len(top):
1✔
395
                    self.unrecv(top[end:])
1✔
396
                    if drop:
1✔
397
                        top = top[:start]
1✔
398
                    else:
399
                        top = top[:end]
1✔
400
                    return b''.join(data) + top
1✔
401
                if len(top) > longest:
1✔
402
                    i = -longest - 1
1✔
403
                    data.append(top[:i])
1✔
404
                    top = top[i:]
1✔
405

UNCOV
406
        return b''
×
407

408
    def recvlines(self, numlines=2**20, keepends=None, drop=None, timeout=default):
1✔
409
        r"""recvlines(numlines, drop=True, timeout=default) -> list of bytes objects
410

411
        Receive up to ``numlines`` lines.
412

413
        A "line" is any sequence of bytes terminated by the byte sequence
414
        set by :attr:`newline`, which defaults to ``'\n'``.
415

416
        If the request is not satisfied before ``timeout`` seconds pass,
417
        all data is buffered and an empty string (``''``) is returned.
418

419
        Arguments:
420
            numlines(int): Maximum number of lines to receive
421
            drop(bool): Drop newlines at the end of each line (:const:`True`).
422
            timeout(int): Maximum timeout
423

424
        Raises:
425
            exceptions.EOFError: The connection closed before the request could be satisfied
426

427
        Returns:
428
            A string containing bytes received from the socket,
429
            or ``''`` if a timeout occurred while waiting.
430

431
        Examples:
432

433
            >>> t = tube()
434
            >>> t.recv_raw = lambda n: b'\n'
435
            >>> t.recvlines(3)
436
            [b'', b'', b'']
437
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
438
            >>> t.recvlines(3)
439
            [b'Foo', b'Bar', b'Baz']
440
            >>> t.recvlines(3, True)
441
            [b'Foo\n', b'Bar\n', b'Baz\n']
442
            >>> t.recvlines(3, drop=False)
443
            [b'Foo\n', b'Bar\n', b'Baz\n']
444
        """
445
        drop = self._normalize_keepends_drop(keepends, drop, True)
1✔
446
        del keepends
1✔
447

448
        lines = []
1✔
449
        with self.countdown(timeout):
1✔
450
            for _ in range(numlines):
1✔
451
                try:
1✔
452
                    # We must set 'drop' to False here so that we can
453
                    # restore the original, unmodified data to the buffer
454
                    # in the event of a timeout.
455
                    res = self.recvline(drop=False, timeout=timeout)
1✔
456
                except Exception:
×
457
                    self.unrecv(b''.join(lines))
×
458
                    raise
×
459

460
                if res:
1!
461
                    lines.append(res)
1✔
462
                else:
463
                    break
×
464

465
        if drop:
1✔
466
            lines = [line.rstrip(self.newline) for line in lines]
1✔
467

468
        return lines
1✔
469

470
    def recvlinesS(self, numlines=2**20, keepends=None, drop=None, timeout=default):
1✔
471
        r"""recvlinesS(numlines, drop=True, timeout=default) -> str list
472

473
        This function is identical to :meth:`recvlines`, but decodes
474
        the received bytes into string using :func:`context.encoding`.
475
        You should use :meth:`recvlines` whenever possible for better performance.
476

477
        Examples:
478

479
            >>> t = tube()
480
            >>> t.recv_raw = lambda n: b'\n'
481
            >>> t.recvlinesS(3)
482
            ['', '', '']
483
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
484
            >>> t.recvlinesS(3)
485
            ['Foo', 'Bar', 'Baz']
486
        """
487
        return [packing._decode(x) for x in self.recvlines(numlines, keepends=keepends, drop=drop, timeout=timeout)]
1✔
488

489
    def recvlinesb(self, numlines=2**20, keepends=None, drop=None, timeout=default):
1✔
490
        r"""recvlinesb(numlines, drop=True, timeout=default) -> bytearray list
491

492
        This function is identical to :meth:`recvlines`, but returns a bytearray.
493

494
        Examples:
495

496
            >>> t = tube()
497
            >>> t.recv_raw = lambda n: b'\n'
498
            >>> t.recvlinesb(3)
499
            [bytearray(b''), bytearray(b''), bytearray(b'')]
500
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
501
            >>> t.recvlinesb(3)
502
            [bytearray(b'Foo'), bytearray(b'Bar'), bytearray(b'Baz')]
503
        """
504
        return [bytearray(x) for x in self.recvlines(numlines, keepends=keepends, drop=drop, timeout=timeout)]
1✔
505

506
    def recvline(self, keepends=None, drop=None, timeout=default):
1✔
507
        r"""recvline(drop=False, timeout=default) -> bytes
508

509
        Receive a single line from the tube.
510

511
        A "line" is any sequence of bytes terminated by the byte sequence
512
        set in :attr:`newline`, which defaults to ``b'\n'``.
513

514
        If the connection is closed (:class:`EOFError`) before a newline
515
        is received, the buffered data is returned by default and a warning
516
        is logged. If the buffer is empty, an :class:`EOFError` is raised.
517
        This behavior can be changed by setting :meth:`pwnlib.context.ContextType.throw_eof_on_incomplete_line`.
518

519
        If the request is not satisfied before ``timeout`` seconds pass,
520
        all data is buffered and an empty byte string (``b''``) is returned.
521

522
        Arguments:
523
            drop(bool): Drop the line ending (:const:`False`).
524
            timeout(int): Timeout
525

526
        Raises:
527
            :class:`EOFError`: The connection closed before the request
528
                                 could be satisfied and the buffer is empty
529

530
        Return:
531
            All bytes received over the tube until the first
532
            newline ``'\n'`` is received.  Optionally retains
533
            the ending. If the connection is closed before a newline
534
            is received, the remaining data received up to this point
535
            is returned.
536

537

538
        Examples:
539

540
            >>> t = tube()
541
            >>> t.recv_raw = lambda n: b'Foo\nBar\r\nBaz\n'
542
            >>> t.recvline()
543
            b'Foo\n'
544
            >>> t.recvline()
545
            b'Bar\r\n'
546
            >>> t.recvline(False)
547
            b'Baz'
548
            >>> t.newline = b'\r\n'
549
            >>> t.recvline(drop=True)
550
            b'Foo\nBar'
551
            >>> t = tube()
552
            >>> def _recv_eof(n):
553
            ...     if not _recv_eof.throw:
554
            ...         _recv_eof.throw = True
555
            ...         return b'real line\ntrailing data'
556
            ...     raise EOFError
557
            >>> _recv_eof.throw = False
558
            >>> t.recv_raw = _recv_eof
559
            >>> t.recvline()
560
            b'real line\n'
561
            >>> t.recvline()
562
            b'trailing data'
563
            >>> t.recvline()
564
            Traceback (most recent call last):
565
                ...
566
            EOFError
567
        """
568
        drop = self._normalize_keepends_drop(keepends, drop, False)
1✔
569
        del keepends
1✔
570

571
        try:
1✔
572
            return self.recvuntil(self.newline, drop=drop, timeout=timeout)
1✔
573
        except EOFError:
1✔
574
            if not context.throw_eof_on_incomplete_line and self.buffer.size > 0:
1✔
575
                if context.throw_eof_on_incomplete_line is None:
1!
576
                    self.warn_once('EOFError during recvline. Returning buffered data without trailing newline.')
1✔
577
                return self.buffer.get()
1✔
578
            raise
1✔
579

580
    def recvline_pred(self, pred, keepends=None, drop=None, timeout=default):
1✔
581
        r"""recvline_pred(pred, drop=True, timeout=default) -> bytes
582

583
        Receive data until ``pred(line)`` returns a truthy value.
584
        Drop all other data.
585

586
        If the request is not satisfied before ``timeout`` seconds pass,
587
        all data is buffered and an empty string (``''``) is returned.
588

589
        Arguments:
590
            pred(callable): Function to call.  Returns the line for which
591
                this function returns :const:`True`.
592
            drop(bool): Drop the line ending (:const:`True`).
593

594
        Examples:
595

596
            >>> t = tube()
597
            >>> t.recv_raw = lambda n: b"Foo\nBar\nBaz\n"
598
            >>> t.recvline_pred(lambda line: line == b"Bar\n")
599
            b'Bar'
600
            >>> t.recvline_pred(lambda line: line == b"Bar\n", True)
601
            b'Bar\n'
602
            >>> t.recvline_pred(lambda line: line == b"Bar\n", drop=False)
603
            b'Bar\n'
604
            >>> t.recvline_pred(lambda line: line == b'Nope!', timeout=0.1)
605
            b''
606
        """
607
        drop = self._normalize_keepends_drop(keepends, drop, True)
1✔
608
        del keepends
1✔
609

610
        tmpbuf = Buffer()
1✔
611
        line   = b''
1✔
612
        with self.countdown(timeout):
1✔
613
            while self.countdown_active():
1✔
614
                try:
1✔
615
                    line = self.recvline(drop=False)
1✔
616
                except Exception:
×
617
                    self.buffer.unget(tmpbuf)
×
618
                    raise
×
619

620
                if not line:
1!
UNCOV
621
                    self.buffer.unget(tmpbuf)
×
UNCOV
622
                    return b''
×
623

624
                if pred(line):
1✔
625
                    if drop:
1✔
626
                        line = line.rstrip(self.newline)
1✔
627
                    return line
1✔
628
                else:
629
                    tmpbuf.add(line)
1✔
630

631
        return b''
1✔
632

633
    def recvline_contains(self, items, keepends=None, drop=None, timeout=default):
1✔
634
        r"""recvline_contains(items, drop=True, timeout=default) -> bytes
635

636
        Receive lines until one line is found which contains at least
637
        one of `items`.
638

639
        Arguments:
640
            items(str,tuple): List of strings to search for, or a single string.
641
            drop(bool): Drop the line ending (:const:`True`).
642
            timeout(int): Timeout, in seconds
643

644
        Examples:
645

646
            >>> t = tube()
647
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
648
            >>> t.recvline_contains(b'r')
649
            b'World'
650
            >>> f = lambda n: b"cat dog bird\napple pear orange\nbicycle car train\n"
651
            >>> t = tube()
652
            >>> t.recv_raw = f
653
            >>> t.recvline_contains(b'pear')
654
            b'apple pear orange'
655
            >>> t = tube()
656
            >>> t.recv_raw = f
657
            >>> t.recvline_contains((b'car', b'train'))
658
            b'bicycle car train'
659
        """
660
        if isinstance(items, (bytes, bytearray, str)):
1✔
661
            items = (items,)
1✔
662
        items = tuple(map(packing._need_bytes, items))
1✔
663

664
        def pred(line):
1✔
665
            return any(d in line for d in items)
1✔
666

667
        return self.recvline_pred(pred, keepends=keepends, drop=drop, timeout=timeout)
1✔
668

669
    def recvline_startswith(self, delims, keepends=None, drop=None, timeout=default):
1✔
670
        r"""recvline_startswith(delims, drop=True, timeout=default) -> bytes
671

672
        Keep receiving lines until one is found that starts with one of
673
        `delims`.  Returns the last line received.
674

675
        If the request is not satisfied before ``timeout`` seconds pass,
676
        all data is buffered and an empty string (``''``) is returned.
677

678
        Arguments:
679
            delims(str,tuple): List of strings to search for, or string of single characters
680
            drop(bool): Drop the line ending (:const:`True`).
681
            timeout(int): Timeout, in seconds
682

683
        Returns:
684
            The first line received which starts with a delimiter in ``delims``.
685

686
        Examples:
687

688
            >>> t = tube()
689
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
690
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'))
691
            b'World'
692
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), drop=False)
693
            b'Xylophone\n'
694
            >>> t.recvline_startswith(b'Wo')
695
            b'World'
696
        """
697
        # Convert string into singleton tupple
698
        if isinstance(delims, (bytes, bytearray, str)):
1✔
699
            delims = (delims,)
1✔
700
        delims = tuple(map(packing._need_bytes, delims))
1✔
701

702
        return self.recvline_pred(lambda line: any(map(line.startswith, delims)),
1✔
703
                                  keepends=keepends,
704
                                  drop=drop,
705
                                  timeout=timeout)
706

707
    def recvline_endswith(self, delims, keepends=None, drop=None, timeout=default):
1✔
708
        r"""recvline_endswith(delims, drop=True, timeout=default) -> bytes
709

710
        Keep receiving lines until one is found that ends with one of
711
        `delims`.  Returns the last line received.
712

713
        If the request is not satisfied before ``timeout`` seconds pass,
714
        all data is buffered and an empty string (``''``) is returned.
715

716
        See :meth:`recvline_startswith` for more details.
717

718
        Examples:
719

720
            >>> t = tube()
721
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\nKaboodle\n'
722
            >>> t.recvline_endswith(b'r')
723
            b'Bar'
724
            >>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), drop=False)
725
            b'Kaboodle\n'
726
            >>> t.recvline_endswith(b'oodle')
727
            b'Kaboodle'
728
        """
729
        # Convert string into singleton tupple
730
        if isinstance(delims, (bytes, bytearray, str)):
1✔
731
            delims = (delims,)
1✔
732

733
        delims = tuple(packing._need_bytes(delim) + self.newline for delim in delims)
1✔
734

735
        return self.recvline_pred(lambda line: any(map(line.endswith, delims)),
1✔
736
                                  keepends=keepends,
737
                                  drop=drop,
738
                                  timeout=timeout)
739

740
    def recvregex(self, regex, exact=False, timeout=default, capture=False):
1✔
741
        r"""recvregex(regex, exact=False, timeout=default, capture=False) -> bytes
742

743
        Wrapper around :func:`recvpred`, which will return when a regex
744
        matches the string in the buffer.
745

746
        Returns all received data up until the regex matched. If `capture` is
747
        set to True, a :class:`re.Match` object is returned instead.
748

749
        By default :func:`re.RegexObject.search` is used, but if `exact` is
750
        set to True, then :func:`re.RegexObject.match` will be used instead.
751

752
        If the request is not satisfied before ``timeout`` seconds pass,
753
        all data is buffered and an empty string (``''``) is returned.
754

755
        Examples:
756

757
            >>> t = tube()
758
            >>> t.recv_raw = lambda n: b'The lucky number is 1337 as always\nBla blubb blargh\n'
759
            >>> m = t.recvregex(br'number is ([0-9]+) as always\n', capture=True)
760
            >>> m.group(1)
761
            b'1337'
762
            >>> t.recvregex(br'Bla .* blargh\n')
763
            b'Bla blubb blargh\n'
764
        """
765

766
        if isinstance(regex, (bytes, bytearray, str)):
1!
767
            regex = packing._need_bytes(regex)
1✔
768
            regex = re.compile(regex)
1✔
769

770
        if exact:
1!
771
            pred = regex.match
×
772
        else:
773
            pred = regex.search
1✔
774

775
        if capture:
1✔
776
            return pred(self.recvpred(pred, timeout = timeout))
1✔
777
        else:
778
            return self.recvpred(pred, timeout = timeout)
1✔
779

780
    def recvline_regex(self, regex, exact=False, keepends=None, drop=None, timeout=default):
1✔
781
        """recvline_regex(regex, exact=False, drop=True, timeout=default) -> bytes
782

783
        Wrapper around :func:`recvline_pred`, which will return when a regex
784
        matches a line.
785

786
        By default :func:`re.RegexObject.search` is used, but if `exact` is
787
        set to True, then :func:`re.RegexObject.match` will be used instead.
788

789
        If the request is not satisfied before ``timeout`` seconds pass,
790
        all data is buffered and an empty string (``''``) is returned.
791
        """
792

793
        if isinstance(regex, (bytes, bytearray, str)):
×
794
            regex = packing._need_bytes(regex)
×
795
            regex = re.compile(regex)
×
796

797
        if exact:
×
798
            pred = regex.match
×
799
        else:
800
            pred = regex.search
×
801

802
        return self.recvline_pred(pred, keepends=keepends, drop=drop, timeout=timeout)
×
803

804
    def recvrepeat(self, timeout=default):
1✔
805
        """recvrepeat(timeout=default) -> bytes
806

807
        Receives data until a timeout or EOF is reached.
808

809
        Examples:
810

811
            >>> data = [
812
            ... b'd',
813
            ... b'', # simulate timeout
814
            ... b'c',
815
            ... b'b',
816
            ... b'a',
817
            ... ]
818
            >>> def delayrecv(n, data=data):
819
            ...     return data.pop()
820
            >>> t = tube()
821
            >>> t.recv_raw = delayrecv
822
            >>> t.recvrepeat(0.2)
823
            b'abc'
824
            >>> t.recv()
825
            b'd'
826
        """
827

828
        try:
1✔
829
            while self._fillbuffer(timeout=timeout):
1✔
830
                pass
1✔
831
        except EOFError:
×
832
            pass
×
833

834
        return self.buffer.get()
1✔
835

836
    def recvall(self, timeout=Timeout.forever):
1✔
837
        """recvall(timeout=Timeout.forever) -> bytes
838

839
        Receives data until EOF is reached and closes the tube.
840
        """
841

842
        with self.waitfor('Receiving all data') as h:
1✔
843
            l = len(self.buffer)
1✔
844
            with self.local(timeout):
1✔
845
                try:
1✔
846
                    while True:
1✔
847
                        l = misc.size(len(self.buffer))
1✔
848
                        h.status(l)
1✔
849
                        if not self._fillbuffer():
1✔
850
                            break
1✔
851
                except EOFError:
1✔
852
                    pass
1✔
853
            h.success("Done (%s)" % l)
1✔
854
        self.close()
1✔
855

856
        return self.buffer.get()
1✔
857

858
    def send(self, data):
1✔
859
        """send(data)
860

861
        Sends data.
862

863
        If log level ``DEBUG`` is enabled, also prints out the data
864
        received.
865

866
        If it is not possible to send anymore because of a closed
867
        connection, it raises ``exceptions.EOFError``
868

869
        Examples:
870

871
            >>> def p(x): print(repr(x))
872
            >>> t = tube()
873
            >>> t.send_raw = p
874
            >>> t.send(b'hello')
875
            b'hello'
876
        """
877

878
        data = packing._need_bytes(data)
1✔
879

880
        if self.isEnabledFor(logging.DEBUG):
1!
881
            self.debug('Sent %#x bytes:' % len(data))
×
882
            self.maybe_hexdump(data, level=logging.DEBUG)
×
883

884
        self.send_raw(data)
1✔
885

886
    def sendline(self, line=b''):
1✔
887
        r"""sendline(data)
888

889
        Shorthand for ``t.send(data + t.newline)``.
890

891
        Examples:
892

893
            >>> def p(x): print(repr(x))
894
            >>> t = tube()
895
            >>> t.send_raw = p
896
            >>> t.sendline(b'hello')
897
            b'hello\n'
898
            >>> t.newline = b'\r\n'
899
            >>> t.sendline(b'hello')
900
            b'hello\r\n'
901
        """
902

903
        line = packing._need_bytes(line)
1✔
904

905
        self.send(line + self.newline)
1✔
906

907
    def sendlines(self, lines=[]):
1✔
908
        for line in lines:
×
909
            line = packing._need_bytes(line)
×
910
            self.sendline(line)
×
911

912
    def sendafter(self, delim, data, timeout = default):
1✔
913
        """sendafter(delim, data, timeout = default) -> str
914

915
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``send(data)``.
916
        """
917

918
        data = packing._need_bytes(data)
×
919
        res = self.recvuntil(delim, timeout=timeout)
×
920
        self.send(data)
×
921
        return res
×
922

923
    def sendlineafter(self, delim, data, timeout = default):
1✔
924
        """sendlineafter(delim, data, timeout = default) -> str
925

926
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``sendline(data)``."""
927

928
        data = packing._need_bytes(data)
1✔
929
        res = self.recvuntil(delim, timeout=timeout)
1✔
930
        self.sendline(data)
1✔
931
        return res
1✔
932

933
    def sendthen(self, delim, data, timeout = default):
1✔
934
        """sendthen(delim, data, timeout = default) -> str
935

936
        A combination of ``send(data)`` and ``recvuntil(delim, timeout=timeout)``."""
937

938
        data = packing._need_bytes(data)
×
939
        self.send(data)
×
940
        return self.recvuntil(delim, timeout=timeout)
×
941

942
    def sendlinethen(self, delim, data, timeout = default):
1✔
943
        """sendlinethen(delim, data, timeout = default) -> str
944

945
        A combination of ``sendline(data)`` and ``recvuntil(delim, timeout=timeout)``."""
946

947
        data = packing._need_bytes(data)
×
948
        self.sendline(data)
×
949
        return self.recvuntil(delim, timeout=timeout)
×
950

951
    def interactive(self, prompt = term.text.bold_red('$') + ' '):
1✔
952
        """interactive(prompt = pwnlib.term.text.bold_red('$') + ' ')
953

954
        Does simultaneous reading and writing to the tube. In principle this just
955
        connects the tube to standard in and standard out, but in practice this
956
        is much more usable, since we are using :mod:`pwnlib.term` to print a
957
        floating prompt.
958

959
        Thus it only works while in :data:`pwnlib.term.term_mode`.
960
        """
961

962
        self.info('Switching to interactive mode')
1✔
963

964
        go = threading.Event()
1✔
965
        def recv_thread():
1✔
966
            while not go.is_set():
1✔
967
                try:
1✔
968
                    cur = self.recv(timeout = 0.05)
1✔
969
                    cur = cur.replace(self.newline, b'\n')
1✔
970
                    if cur:
1✔
971
                        stdout = sys.stdout
1✔
972
                        if not term.term_mode:
1!
973
                            stdout = getattr(stdout, 'buffer', stdout)
1✔
974
                        stdout.write(cur)
1✔
975
                        stdout.flush()
1✔
976
                except EOFError:
×
977
                    self.info('Got EOF while reading in interactive')
×
978
                    break
×
979

980
        t = context.Thread(target = recv_thread)
1✔
981
        t.daemon = True
1✔
982
        t.start()
1✔
983

984
        from pwnlib.args import term_mode
1✔
985
        try:
1✔
986
            os_linesep = os.linesep.encode()
1✔
987
            to_skip = b''
1✔
988
            while not go.is_set():
1✔
989
                if term.term_mode:
1!
990
                    data = term.readline.readline(prompt = prompt, float = True)
×
991
                    if data.endswith(b'\n') and self.newline != b'\n':
×
992
                        data = data[:-1] + self.newline
×
993
                else:
994
                    stdin = getattr(sys.stdin, 'buffer', sys.stdin)
1✔
995
                    data = stdin.read(1)
1✔
996
                    # Keep OS's line separator if NOTERM is set and
997
                    # the user did not specify a custom newline
998
                    # even if stdin is a tty.
999
                    if sys.stdin.isatty() and (
1!
1000
                        term_mode
1001
                        or context.newline != b"\n"
1002
                        or self._newline is not None
1003
                    ):
1004
                        if to_skip:
×
1005
                            if to_skip[:1] != data:
×
1006
                                data = os_linesep[: -len(to_skip)] + data
×
1007
                            else:
1008
                                to_skip = to_skip[1:]
×
1009
                                if to_skip:
×
1010
                                    continue
×
1011
                                data = self.newline
×
1012
                        # If we observe a prefix of the line separator in a tty,
1013
                        # assume we'll see the rest of it immediately after.
1014
                        # This could stall until the next character is seen if
1015
                        # the line separator is started but never finished, but
1016
                        # that is unlikely to happen in a dynamic tty.
1017
                        elif data and os_linesep.startswith(data):
×
1018
                            if len(os_linesep) > 1:
×
1019
                                to_skip = os_linesep[1:]
×
1020
                                continue
×
1021
                            data = self.newline
×
1022

1023
                if data:
1!
1024
                    try:
×
1025
                        self.send(data)
×
1026
                    except EOFError:
×
1027
                        go.set()
×
1028
                        self.info('Got EOF while sending in interactive')
×
1029
                else:
1030
                    go.set()
1✔
1031
        except KeyboardInterrupt:
×
1032
            self.info('Interrupted')
×
1033
            go.set()
×
1034

1035
        while t.is_alive():
1✔
1036
            t.join(timeout = 0.1)
1✔
1037

1038
    def stream(self, line_mode=True):
1✔
1039
        """stream()
1040

1041
        Receive data until the tube exits, and print it to stdout.
1042

1043
        Similar to :func:`interactive`, except that no input is sent.
1044

1045
        Similar to ``print(tube.recvall())`` except that data is printed
1046
        as it is received, rather than after all data is received.
1047

1048
        Arguments:
1049
            line_mode(bool): Whether to receive line-by-line or raw data.
1050

1051
        Returns:
1052
            All data printed.
1053
        """
1054
        buf = Buffer()
×
1055
        function = self.recvline if line_mode else self.recv
×
1056
        try:
×
1057
            while True:
×
1058
                buf.add(function())
×
1059
                stdout = sys.stdout
×
1060
                if not term.term_mode:
×
1061
                    stdout = getattr(stdout, 'buffer', stdout)
×
1062
                stdout.write(buf.data[-1])
×
1063
        except KeyboardInterrupt:
×
1064
            pass
×
1065
        except EOFError:
×
1066
            pass
×
1067

1068
        return buf.get()
×
1069

1070
    def clean(self, timeout = 0.05):
1✔
1071
        """clean(timeout = 0.05)
1072

1073
        Removes all the buffered data from a tube by calling
1074
        :meth:`pwnlib.tubes.tube.tube.recv` with a low timeout until it fails.
1075

1076
        If ``timeout`` is zero, only cached data will be cleared.
1077

1078
        Note: If timeout is set to zero, the underlying network is
1079
        not actually polled; only the internal buffer is cleared.
1080

1081
        Returns:
1082

1083
            All data received
1084

1085
        Examples:
1086

1087
            >>> t = tube()
1088
            >>> t.unrecv(b'clean me up')
1089
            >>> t.clean(0)
1090
            b'clean me up'
1091
            >>> len(t.buffer)
1092
            0
1093
        """
1094
        if timeout == 0:
1✔
1095
            return self.buffer.get()
1✔
1096

1097
        return self.recvrepeat(timeout)
1✔
1098

1099
    def clean_and_log(self, timeout = 0.05):
1✔
1100
        r"""clean_and_log(timeout = 0.05)
1101

1102
        Works exactly as :meth:`pwnlib.tubes.tube.tube.clean`, but logs received
1103
        data with :meth:`pwnlib.self.info`.
1104

1105
        Returns:
1106

1107
            All data received
1108

1109
        Examples:
1110

1111
            >>> def recv(n, data=[b'', b'hooray_data']):
1112
            ...     while data: return data.pop()
1113
            >>> t = tube()
1114
            >>> t.recv_raw      = recv
1115
            >>> t.connected_raw = lambda d: True
1116
            >>> t.fileno        = lambda: 1234
1117
            >>> with context.local(log_level='info'):
1118
            ...     data = t.clean_and_log()
1119
            [...] Received 0xb bytes:
1120
                b'hooray_data'
1121
            >>> data
1122
            b'hooray_data'
1123
            >>> context.clear()
1124
        """
1125
        cached_data = self.buffer.get()
1✔
1126
        if cached_data and not self.isEnabledFor(logging.DEBUG):
1!
1127
            with context.local(log_level='debug'):
×
1128
                self.debug('Received %#x bytes:' % len(cached_data))
×
1129
                self.maybe_hexdump(cached_data, level=logging.DEBUG)
×
1130
        with context.local(log_level='debug'):
1✔
1131
            return cached_data + self.clean(timeout)
1✔
1132

1133
    def upload_manually(self, data, target_path = './payload', prompt = b'$', chunk_size = 0x200, chmod_flags = 'u+x', compression='auto', end_marker = 'PWNTOOLS_DONE'):
1✔
1134
        """upload_manually(data, target_path = './payload', prompt = b'$', chunk_size = 0x200, chmod_flags = 'u+x', compression='auto', end_marker = 'PWNTOOLS_DONE')
1135

1136
        Upload a file manually using base64 encoding and compression.
1137
        This can be used when the tube is connected to a shell.
1138

1139
        The file is uploaded in base64-encoded chunks by appending to a file
1140
        and then decompressing it:
1141

1142
        .. code-block::
1143

1144
            loop:
1145
                echo <chunk> | base64 -d >> <target_path>.<compression>
1146
            <compression> -d -f <target_path>.<compression>
1147
            chmod <chmod_flags> <target_path>
1148

1149
        It is assumed that a `base64` command is available on the target system.
1150
        When ``compression`` is ``auto`` the best compression utility available
1151
        between ``gzip`` and ``xz`` is chosen with a fallback to uncompressed
1152
        upload.
1153

1154
        Arguments:
1155

1156
            data(bytes): The data to upload.
1157
            target_path(str): The path to upload the data to.
1158
            prompt(bytes): The shell prompt to wait for.
1159
            chunk_size(int): The size of each chunk to upload.
1160
            chmod_flags(str): The flags to use with chmod. ``""`` to ignore.
1161
            compression(str): The compression to use. ``auto`` to automatically choose the best compression or ``gzip`` or ``xz``.        
1162
            end_marker(str): The marker to use to detect the end of the output. Only used when prompt is not set.
1163

1164
        Examples:
1165

1166
        .. doctest::
1167
            :options: +POSIX +TODO
1168

1169
            >>> l = listen()
1170
            >>> l.spawn_process('/bin/sh')
1171
            >>> r = remote('127.0.0.1', l.lport)
1172
            >>> r.upload_manually(b'some\\xca\\xfedata\\n', prompt=b'', chmod_flags='')
1173
            >>> r.sendline(b'cat ./payload')
1174
            >>> r.recvline()
1175
            b'some\\xca\\xfedata\\n'
1176

1177
            >>> r.upload_manually(cyclic(0x1000), target_path='./cyclic_pattern', prompt=b'', chunk_size=0x10, compression='gzip')
1178
            >>> r.sendline(b'sha256sum ./cyclic_pattern')
1179
            >>> r.recvlineS(keepends=False).startswith(sha256sumhex(cyclic(0x1000)))
1180
            True
1181

1182
            >>> blob = ELF.from_assembly(shellcraft.echo('Hello world!\\n') + shellcraft.exit(0))
1183
            >>> r.upload_manually(blob.data, prompt=b'')
1184
            >>> r.sendline(b'./payload')
1185
            >>> r.recvline()
1186
            b'Hello world!\\n'
1187
            >>> r.close()
1188
            >>> l.close()
1189
        """
1190
        echo_end = ""
1✔
1191
        if not prompt:
1!
1192
            echo_end = "; echo {}".format(end_marker)
1✔
1193
            end_markerb = end_marker.encode()
1✔
1194
        else:
1195
            end_markerb = prompt
×
1196

1197
        # Detect available compression utility, fallback to uncompressed upload.
1198
        compression_mode = None
1✔
1199
        possible_compression = ['xz', 'gzip']
1✔
1200
        if not prompt:
1!
1201
            self.sendline("echo {}".format(end_marker).encode())
1✔
1202
        if compression == 'auto':
1✔
1203
            for utility in possible_compression:
1!
1204
                self.sendlineafter(end_markerb, "command -v {} && echo YEP || echo NOPE{}".format(utility, echo_end).encode())
1✔
1205
                result = self.recvuntil([b'YEP', b'NOPE'])
1✔
1206
                if b'YEP' in result:
1!
1207
                    compression_mode = utility
1✔
1208
                    break
1✔
1209
        elif compression in possible_compression:
1!
1210
            compression_mode = compression
1✔
1211
        else:
1212
            self.error('Invalid compression mode: %s, has to be one of %s', compression, possible_compression)
×
1213

1214
        self.debug('Manually uploading using compression mode: %s', compression_mode)
1✔
1215

1216
        compressed_data = b''
1✔
1217
        if compression_mode == 'xz':
1✔
1218
            import lzma
1✔
1219
            compressed_data = lzma.compress(data, format=lzma.FORMAT_XZ, preset=9)
1✔
1220
            compressed_path = target_path + '.xz'
1✔
1221
        elif compression_mode == 'gzip':
1!
1222
            import gzip
1✔
1223
            from io import BytesIO
1✔
1224
            f = BytesIO()
1✔
1225
            with gzip.GzipFile(fileobj=f, mode='wb', compresslevel=9) as g:
1✔
1226
                g.write(data)
1✔
1227
            compressed_data = f.getvalue()
1✔
1228
            compressed_path = target_path + '.gz'
1✔
1229
        else:
1230
            compressed_path = target_path
×
1231

1232
        # Don't compress if it doesn't reduce the size.
1233
        if len(compressed_data) >= len(data):
1✔
1234
            compression_mode = None
1✔
1235
            compressed_path = target_path
1✔
1236
        else:
1237
            data = compressed_data
1✔
1238

1239
        # Upload data in `chunk_size` chunks. Assume base64 is available.
1240
        with self.progress('Uploading payload') as p:
1✔
1241
            for idx, chunk in enumerate(iters.group(chunk_size, data)):
1✔
1242
                if None in chunk:
1✔
1243
                    chunk = chunk[:chunk.index(None)]
1✔
1244
                if idx == 0:
1✔
1245
                    self.sendlineafter(end_markerb, "echo {} | base64 -d > {}{}".format(fiddling.b64e(bytearray(chunk)), compressed_path, echo_end).encode())
1✔
1246
                else:
1247
                    self.sendlineafter(end_markerb, "echo {} | base64 -d >> {}{}".format(fiddling.b64e(bytearray(chunk)), compressed_path, echo_end).encode())
1✔
1248
                p.status('{}/{} {}'.format(idx+1, len(data)//chunk_size+1, misc.size(idx*chunk_size + len(chunk))))
1✔
1249
            p.success(misc.size(len(data)))
1✔
1250

1251
        # Decompress the file and set the permissions.
1252
        if compression_mode is not None:
1✔
1253
            self.sendlineafter(end_markerb, '{} -d -f {}{}'.format(compression_mode, compressed_path, echo_end).encode())
1✔
1254
        if chmod_flags:
1✔
1255
            self.sendlineafter(end_markerb, 'chmod {} {}{}'.format(chmod_flags, target_path, echo_end).encode())
1✔
1256
        if not prompt:
1!
1257
            self.recvuntil(end_markerb + b'\n')
1✔
1258

1259
    def connect_input(self, other):
1✔
1260
        """connect_input(other)
1261

1262
        Connects the input of this tube to the output of another tube object.
1263

1264

1265
        Examples:
1266

1267
            >>> def p(x): print(x.decode())
1268
            >>> def recvone(n, data=[b'data']):
1269
            ...     while data: return data.pop()
1270
            ...     raise EOFError
1271
            >>> a = tube()
1272
            >>> b = tube()
1273
            >>> a.recv_raw = recvone
1274
            >>> b.send_raw = p
1275
            >>> a.connected_raw = lambda d: True
1276
            >>> b.connected_raw = lambda d: True
1277
            >>> a.shutdown      = lambda d: True
1278
            >>> b.shutdown      = lambda d: True
1279
            >>> import time
1280
            >>> _=(b.connect_input(a), time.sleep(0.1))
1281
            data
1282
        """
1283

1284
        def pump():
1✔
1285
            import sys as _sys
1✔
1286
            while self.countdown_active():
1!
1287
                if not (self.connected('send') and other.connected('recv')):
1✔
1288
                    break
1✔
1289

1290
                try:
1✔
1291
                    data = other.recv(timeout = 0.05)
1✔
1292
                except EOFError:
1✔
1293
                    break
1✔
1294

1295
                if not _sys:
1!
1296
                    return
×
1297

1298
                if not data:
1✔
1299
                    continue
1✔
1300

1301
                try:
1✔
1302
                    self.send(data)
1✔
1303
                except EOFError:
×
1304
                    break
×
1305

1306
                if not _sys:
1!
1307
                    return
×
1308

1309
            self.shutdown('send')
1✔
1310
            other.shutdown('recv')
1✔
1311

1312
        t = context.Thread(target = pump)
1✔
1313
        t.daemon = True
1✔
1314
        t.start()
1✔
1315

1316
    def connect_output(self, other):
1✔
1317
        """connect_output(other)
1318

1319
        Connects the output of this tube to the input of another tube object.
1320

1321
        Examples:
1322

1323
            >>> def p(x): print(repr(x))
1324
            >>> def recvone(n, data=[b'data']):
1325
            ...     while data: return data.pop()
1326
            ...     raise EOFError
1327
            >>> a = tube()
1328
            >>> b = tube()
1329
            >>> a.recv_raw = recvone
1330
            >>> b.send_raw = p
1331
            >>> a.connected_raw = lambda d: True
1332
            >>> b.connected_raw = lambda d: True
1333
            >>> a.shutdown      = lambda d: True
1334
            >>> b.shutdown      = lambda d: True
1335
            >>> _=(a.connect_output(b), time.sleep(0.1))
1336
            b'data'
1337
        """
1338

1339
        other.connect_input(self)
1✔
1340

1341
    def connect_both(self, other):
1✔
1342
        """connect_both(other)
1343

1344
        Connects the both ends of this tube object with another tube object."""
1345

1346
        self.connect_input(other)
1✔
1347
        self.connect_output(other)
1✔
1348

1349
    def spawn_process(self, *args, **kwargs):
1✔
1350
        """Spawns a new process having this tube as stdin, stdout and stderr.
1351

1352
        Takes the same arguments as :class:`subprocess.Popen`."""
1353

1354
        return subprocess.Popen(
1✔
1355
            *args,
1356
            stdin = self.fileno(),
1357
            stdout = self.fileno(),
1358
            stderr = self.fileno(),
1359
            **kwargs
1360
        )
1361

1362
    def __lshift__(self, other):
1✔
1363
        """
1364
        Shorthand for connecting multiple tubes.
1365

1366
        See :meth:`connect_input` for more information.
1367

1368
        Examples:
1369

1370
            The following are equivalent ::
1371

1372
                tube_a >> tube.b
1373
                tube_a.connect_input(tube_b)
1374

1375
            This is useful when chaining multiple tubes ::
1376

1377
                tube_a >> tube_b >> tube_a
1378
                tube_a.connect_input(tube_b)
1379
                tube_b.connect_input(tube_a)
1380
        """
1381
        self.connect_input(other)
×
1382
        return other
×
1383

1384
    def __rshift__(self, other):
1✔
1385
        """
1386
        Inverse of the ``<<`` operator.  See :meth:`__lshift__`.
1387

1388
        See :meth:`connect_input` for more information.
1389
        """
1390
        self.connect_output(other)
×
1391
        return other
×
1392

1393
    def __ne__(self, other):
1✔
1394
        """
1395
        Shorthand for connecting tubes to eachother.
1396

1397
        The following are equivalent ::
1398

1399
            a >> b >> a
1400
            a <> b
1401

1402
        See :meth:`connect_input` for more information.
1403
        """
1404
        self << other << self
×
1405

1406
    def wait_for_close(self, timeout=default):
1✔
1407
        """Waits until the tube is closed."""
1408

1409
        with self.countdown(timeout):
1✔
1410
            while self.countdown_active():
1!
1411
                if not self.connected():
1✔
1412
                    return
1✔
1413
                time.sleep(min(self.timeout, 0.05))
1✔
1414

1415
    wait = wait_for_close
1✔
1416

1417
    def can_recv(self, timeout = 0):
1✔
1418
        """can_recv(timeout = 0) -> bool
1419

1420
        Returns True, if there is data available within `timeout` seconds.
1421

1422
        Examples:
1423

1424
            >>> import time
1425
            >>> t = tube()
1426
            >>> t.can_recv_raw = lambda *a: False
1427
            >>> t.can_recv()
1428
            False
1429
            >>> _=t.unrecv(b'data')
1430
            >>> t.can_recv()
1431
            True
1432
            >>> _=t.recv()
1433
            >>> t.can_recv()
1434
            False
1435
        """
1436

1437
        return bool(self.buffer or self.can_recv_raw(timeout))
1✔
1438

1439
    def settimeout(self, timeout):
1✔
1440
        """settimeout(timeout)
1441

1442
        Set the timeout for receiving operations. If the string "default"
1443
        is given, then :data:`context.timeout` will be used. If None is given,
1444
        then there will be no timeout.
1445

1446
        Examples:
1447

1448
            >>> t = tube()
1449
            >>> t.settimeout_raw = lambda t: None
1450
            >>> t.settimeout(3)
1451
            >>> t.timeout == 3
1452
            True
1453
        """
1454

1455
        self.timeout = timeout
1✔
1456

1457

1458
    shutdown_directions = {
1✔
1459
        'in':    'recv',
1460
        'read':  'recv',
1461
        'recv':  'recv',
1462
        'out':   'send',
1463
        'write': 'send',
1464
        'send':  'send',
1465
    }
1466

1467
    connected_directions = shutdown_directions.copy()
1✔
1468
    connected_directions['any'] = 'any'
1✔
1469

1470
    def shutdown(self, direction = "send"):
1✔
1471
        """shutdown(direction = "send")
1472

1473
        Closes the tube for futher reading or writing depending on `direction`.
1474

1475
        Arguments:
1476
          direction(str): Which direction to close; "in", "read" or "recv"
1477
            closes the tube in the ingoing direction, "out", "write" or "send"
1478
            closes it in the outgoing direction.
1479

1480
        Returns:
1481
          :const:`None`
1482

1483
        Examples:
1484

1485
            >>> def p(x): print(x)
1486
            >>> t = tube()
1487
            >>> t.shutdown_raw = p
1488
            >>> _=list(map(t.shutdown, ('in', 'read', 'recv', 'out', 'write', 'send')))
1489
            recv
1490
            recv
1491
            recv
1492
            send
1493
            send
1494
            send
1495
            >>> t.shutdown('bad_value')
1496
            Traceback (most recent call last):
1497
            ...
1498
            KeyError: "direction must be in ['in', 'out', 'read', 'recv', 'send', 'write']"
1499
        """
1500
        try:
1✔
1501
            direction = self.shutdown_directions[direction]
1✔
1502
        except KeyError:
1✔
1503
            raise KeyError('direction must be in %r' % sorted(self.shutdown_directions))
1✔
1504
        else:
1505
            self.shutdown_raw(self.shutdown_directions[direction])
1✔
1506

1507
    def connected(self, direction = 'any'):
1✔
1508
        """connected(direction = 'any') -> bool
1509

1510
        Returns True if the tube is connected in the specified direction.
1511

1512
        Arguments:
1513
          direction(str): Can be the string 'any', 'in', 'read', 'recv',
1514
                          'out', 'write', 'send'.
1515

1516
        Doctest:
1517

1518
            >>> def p(x): print(x)
1519
            >>> t = tube()
1520
            >>> t.connected_raw = p
1521
            >>> _=list(map(t.connected, ('any', 'in', 'read', 'recv', 'out', 'write', 'send')))
1522
            any
1523
            recv
1524
            recv
1525
            recv
1526
            send
1527
            send
1528
            send
1529
            >>> t.connected('bad_value')
1530
            Traceback (most recent call last):
1531
            ...
1532
            KeyError: "direction must be in ['any', 'in', 'out', 'read', 'recv', 'send', 'write']"
1533
        """
1534
        try:
1✔
1535
            direction = self.connected_directions[direction]
1✔
1536
        except KeyError:
1✔
1537
            raise KeyError('direction must be in %r' % sorted(self.connected_directions))
1✔
1538
        else:
1539
            return self.connected_raw(direction)
1✔
1540

1541
    def __enter__(self):
1✔
1542
        """Permit use of 'with' to control scoping and closing sessions.
1543

1544
        Examples:
1545

1546
            >>> t = tube()
1547
            >>> def p(x): print(x)
1548
            >>> t.close = lambda: p("Closed!")
1549
            >>> with t: pass
1550
            Closed!
1551
        """
1552
        return self
1✔
1553

1554
    def __exit__(self, type, value, traceback):
1✔
1555
        """Handles closing for 'with' statement
1556

1557
        See :meth:`__enter__`
1558
        """
1559
        self.close()
1✔
1560

1561
    # The minimal interface to be implemented by a child
1562
    @abc.abstractmethod
1✔
1563
    def recv_raw(self, numb):
1✔
1564
        """recv_raw(numb) -> str
1565

1566
        Should not be called directly. Receives data without using the buffer
1567
        on the object.
1568

1569
        Unless there is a timeout or closed connection, this should always
1570
        return data. In case of a timeout, it should return None, in case
1571
        of a closed connection it should raise an ``exceptions.EOFError``.
1572
        """
1573

1574
        raise EOFError('Not implemented')
1✔
1575

1576
    @abc.abstractmethod
1✔
1577
    def send_raw(self, data):
1✔
1578
        """send_raw(data)
1579

1580
        Should not be called directly. Sends data to the tube.
1581

1582
        Should return ``exceptions.EOFError``, if it is unable to send any
1583
        more, because of a closed tube.
1584
        """
1585

1586
        raise EOFError('Not implemented')
×
1587

1588
    def settimeout_raw(self, timeout):
1✔
1589
        """settimeout_raw(timeout)
1590

1591
        Should not be called directly. Sets the timeout for
1592
        the tube.
1593
        """
1594

1595
        raise NotImplementedError()
1✔
1596

1597
    def timeout_change(self):
1✔
1598
        """
1599
        Should not be called directly. Informs the raw layer of the tube that the timeout has changed.
1600

1601

1602
        Inherited from :class:`Timeout`.
1603
        """
1604
        try:
1✔
1605
            self.settimeout_raw(self.timeout)
1✔
1606
        except NotImplementedError:
1✔
1607
            pass
1✔
1608

1609
    def can_recv_raw(self, timeout):
1✔
1610
        """can_recv_raw(timeout) -> bool
1611

1612
        Should not be called directly. Returns True, if
1613
        there is data available within the timeout, but
1614
        ignores the buffer on the object.
1615
        """
1616

1617
        raise NotImplementedError()
×
1618

1619
    def connected_raw(self, direction):
1✔
1620
        """connected(direction = 'any') -> bool
1621

1622
        Should not be called directly.  Returns True iff the
1623
        tube is connected in the given direction.
1624
        """
1625

1626
        raise NotImplementedError()
×
1627

1628
    def close(self):
1✔
1629
        """close()
1630

1631
        Closes the tube.
1632
        """
1633
        pass
×
1634
        # Ideally we could:
1635
        # raise NotImplementedError()
1636
        # But this causes issues with the unit tests.
1637

1638
    def fileno(self):
1✔
1639
        """fileno() -> int
1640

1641
        Returns the file number used for reading.
1642
        """
1643

1644
        raise NotImplementedError()
×
1645

1646
    def shutdown_raw(self, direction):
1✔
1647
        """shutdown_raw(direction)
1648

1649
        Should not be called directly.  Closes the tube for further reading or
1650
        writing.
1651
        """
1652

1653
        raise NotImplementedError()
×
1654

1655

1656
    def p64(self, *a, **kw):        return self.send(packing.p64(*a, **kw))
1!
1657
    def p32(self, *a, **kw):        return self.send(packing.p32(*a, **kw))
1!
1658
    def p16(self, *a, **kw):        return self.send(packing.p16(*a, **kw))
1!
1659
    def p8(self, *a, **kw):         return self.send(packing.p8(*a, **kw))
1!
1660
    def pack(self, *a, **kw):       return self.send(packing.pack(*a, **kw))
1✔
1661

1662
    def u64(self, *a, **kw):        return packing.u64(self.recvn(8), *a, **kw)
1!
1663
    def u32(self, *a, **kw):        return packing.u32(self.recvn(4), *a, **kw)
1✔
1664
    def u16(self, *a, **kw):        return packing.u16(self.recvn(2), *a, **kw)
1!
1665
    def u8(self, *a, **kw):         return packing.u8(self.recvn(1), *a, **kw)
1!
1666
    def unpack(self, *a, **kw):     return packing.unpack(self.recvn(context.bytes), *a, **kw)
1✔
1667

1668
    def flat(self, *a, **kw):       return self.send(packing.flat(*a,**kw))
1✔
1669
    def fit(self, *a, **kw):        return self.send(packing.fit(*a, **kw))
1!
1670

1671
    # Dynamic functions
1672

1673
    def make_wrapper(func):
1✔
1674
        def wrapperb(self, *a, **kw):
1✔
1675
            return bytearray(func(self, *a, **kw))
×
1676
        def wrapperS(self, *a, **kw):
1✔
1677
            return packing._decode(func(self, *a, **kw))
1✔
1678
        wrapperb.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a bytearray'.format(func=func)
1✔
1679
        wrapperb.__name__ = func.__name__ + 'b'
1✔
1680
        wrapperS.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a str, ' \
1✔
1681
                           'decoding the result using `context.encoding`. ' \
1682
                           '(note that the binary versions are way faster)'.format(func=func)
1683
        wrapperS.__name__ = func.__name__ + 'S'
1✔
1684
        return wrapperb, wrapperS
1✔
1685

1686
    for func in [recv,
1✔
1687
                 recvn,
1688
                 recvall,
1689
                 recvrepeat,
1690
                 recvuntil,
1691
                 recvpred,
1692
                 recvregex,
1693
                 recvline,
1694
                 recvline_contains,
1695
                 recvline_startswith,
1696
                 recvline_endswith,
1697
                 recvline_regex]:
1698
        for wrapper in make_wrapper(func):
1✔
1699
            locals()[wrapper.__name__] = wrapper
1✔
1700

1701
    def make_wrapper(func, alias):
1✔
1702
        def wrapper(self, *a, **kw):
1✔
1703
            return func(self, *a, **kw)
1✔
1704
        wrapper.__doc__ = 'Alias for :meth:`{func.__name__}`'.format(func=func)
1✔
1705
        wrapper.__name__ = alias
1✔
1706
        return wrapper
1✔
1707

1708
    for _name in list(locals()):
1✔
1709
        if 'recv' in _name:
1✔
1710
            _name2 = _name.replace('recv', 'read')
1✔
1711
        elif 'send' in _name:
1✔
1712
            _name2 = _name.replace('send', 'write')
1✔
1713
        else:
1714
            continue
1✔
1715
        locals()[_name2] = make_wrapper(locals()[_name], _name2)
1✔
1716

1717
    # Clean up the scope
1718
    del wrapper, func, make_wrapper, _name, _name2
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc