• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 11996306614

24 Nov 2024 12:54PM UTC coverage: 73.582% (+0.1%) from 73.442%
11996306614

push

github

web-flow
Merge branch 'dev' into add_ko_file_search_support

3796 of 6420 branches covered (59.13%)

62 of 65 new or added lines in 3 files covered. (95.38%)

3 existing lines in 2 files now uncovered.

13311 of 18090 relevant lines covered (73.58%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.25
/pwnlib/tubes/tube.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4

5
import abc
1✔
6
import logging
1✔
7
import os
1✔
8
import re
1✔
9
import six
1✔
10
import string
1✔
11
import subprocess
1✔
12
import sys
1✔
13
import threading
1✔
14
import time
1✔
15

16
from six.moves import range
1✔
17

18
from pwnlib import atexit
1✔
19
from pwnlib import term
1✔
20
from pwnlib.context import context
1✔
21
from pwnlib.log import Logger
1✔
22
from pwnlib.timeout import Timeout
1✔
23
from pwnlib.tubes.buffer import Buffer
1✔
24
from pwnlib.util import fiddling
1✔
25
from pwnlib.util import iters
1✔
26
from pwnlib.util import misc
1✔
27
from pwnlib.util import packing
1✔
28

29

30
class tube(Timeout, Logger):
1✔
31
    """
32
    Container of all the tube functions common to sockets, TTYs and SSH connetions.
33
    """
34

35
    default = Timeout.default
1✔
36
    forever = Timeout.forever
1✔
37

38
    def __init__(self, timeout = default, level = None, *a, **kw):
1✔
39
        super(tube, self).__init__(timeout)
1✔
40

41
        Logger.__init__(self, None)
1✔
42
        if level is not None:
1✔
43
            self.setLevel(level)
1✔
44

45
        self.buffer = Buffer(*a, **kw)
1✔
46
        self._newline = None
1✔
47
        atexit.register(self.close)
1✔
48

49
    @property
1✔
50
    def newline(self):
1✔
51
        r'''Character sent with methods like sendline() or used for recvline().
52

53
            >>> t = tube()
54
            >>> t.newline = b'X'
55
            >>> t.unrecv(b'A\nB\nCX')
56
            >>> t.recvline()
57
            b'A\nB\nCX'
58

59
            >>> t = tube()
60
            >>> context.newline = b'\r\n'
61
            >>> t.newline
62
            b'\r\n'
63

64
            # Clean up
65
            >>> context.clear()
66
        '''
67
        if self._newline is not None:
1✔
68
            return self._newline
1✔
69
        return context.newline
1✔
70

71
    @newline.setter
1✔
72
    def newline(self, newline):
1✔
73
        self._newline = packing._need_bytes(newline)
1✔
74

75
    # Functions based on functions from subclasses
76
    def recv(self, numb = None, timeout = default):
1✔
77
        r"""recv(numb = 4096, timeout = default) -> bytes
78

79
        Receives up to `numb` bytes of data from the tube, and returns
80
        as soon as any quantity of data is available.
81

82
        If the request is not satisfied before ``timeout`` seconds pass,
83
        all data is buffered and an empty string (``''``) is returned.
84

85
        Raises:
86
            exceptions.EOFError: The connection is closed
87

88
        Returns:
89
            A bytes object containing bytes received from the socket,
90
            or ``''`` if a timeout occurred while waiting.
91

92
        Examples:
93

94
            >>> t = tube()
95
            >>> # Fake a data source
96
            >>> t.recv_raw = lambda n: b'Hello, world'
97
            >>> t.recv() == b'Hello, world'
98
            True
99
            >>> t.unrecv(b'Woohoo')
100
            >>> t.recv() == b'Woohoo'
101
            True
102
            >>> with context.local(log_level='debug'):
103
            ...    _ = t.recv() # doctest: +ELLIPSIS
104
            [...] Received 0xc bytes:
105
                b'Hello, world'
106
        """
107
        numb = self.buffer.get_fill_size(numb)
1✔
108
        return self._recv(numb, timeout) or b''
1✔
109

110
    def unrecv(self, data):
1✔
111
        """unrecv(data)
112

113
        Puts the specified data back at the beginning of the receive
114
        buffer.
115

116
        Examples:
117

118
            >>> t = tube()
119
            >>> t.recv_raw = lambda n: b'hello'
120
            >>> t.recv()
121
            b'hello'
122
            >>> t.recv()
123
            b'hello'
124
            >>> t.unrecv(b'world')
125
            >>> t.recv()
126
            b'world'
127
            >>> t.recv()
128
            b'hello'
129
        """
130
        data = packing._need_bytes(data)
1✔
131
        self.buffer.unget(data)
1✔
132

133
    def _fillbuffer(self, timeout = default):
1✔
134
        """_fillbuffer(timeout = default)
135

136
        Fills the internal buffer from the pipe, by calling
137
        :meth:`recv_raw` exactly once.
138

139
        Returns:
140

141
            The bytes of data received, or ``''`` if no data was received.
142

143
        Examples:
144

145
            >>> t = tube()
146
            >>> t.recv_raw = lambda *a: b'abc'
147
            >>> len(t.buffer)
148
            0
149
            >>> t._fillbuffer()
150
            b'abc'
151
            >>> len(t.buffer)
152
            3
153
        """
154
        data = b''
1✔
155

156
        with self.local(timeout):
1✔
157
            data = self.recv_raw(self.buffer.get_fill_size())
1✔
158

159
        if data and self.isEnabledFor(logging.DEBUG):
1✔
160
            self.debug('Received %#x bytes:' % len(data))
1✔
161
            self.maybe_hexdump(data, level=logging.DEBUG)
1✔
162
        if data:
1✔
163
            self.buffer.add(data)
1✔
164

165
        return data
1✔
166

167

168
    def _recv(self, numb = None, timeout = default):
1✔
169
        """_recv(numb = 4096, timeout = default) -> str
170

171
        Receives one chunk of from the internal buffer or from the OS if the
172
        buffer is empty.
173
        """
174
        numb = self.buffer.get_fill_size(numb)
1✔
175

176
        # No buffered data, could not put anything in the buffer
177
        # before timeout.
178
        if not self.buffer and not self._fillbuffer(timeout):
1✔
179
            return b''
1✔
180

181
        return self.buffer.get(numb)
1✔
182

183
    def recvpred(self, pred, timeout = default):
1✔
184
        """recvpred(pred, timeout = default) -> bytes
185

186
        Receives one byte at a time from the tube, until ``pred(all_bytes)``
187
        evaluates to True.
188

189
        If the request is not satisfied before ``timeout`` seconds pass,
190
        all data is buffered and an empty string (``''``) is returned.
191

192
        Arguments:
193
            pred(callable): Function to call, with the currently-accumulated data.
194
            timeout(int): Timeout for the operation
195

196
        Raises:
197
            exceptions.EOFError: The connection is closed
198

199
        Returns:
200
            A bytes object containing bytes received from the socket,
201
            or ``''`` if a timeout occurred while waiting.
202

203
        Examples:
204

205
            >>> t = tube()
206
            >>> t.recv_raw = lambda n: b'abbbaccc'
207
            >>> pred = lambda p: p.count(b'a') == 2
208
            >>> t.recvpred(pred)
209
            b'abbba'
210
            >>> pred = lambda p: p.count(b'd') > 0
211
            >>> t.recvpred(pred, timeout=0.05)
212
            b''
213
        """
214

215
        data = b''
1✔
216

217
        with self.countdown(timeout):
1✔
218
            while not pred(data):
1✔
219
                if not self.countdown_active():
1✔
220
                    self.unrecv(data)
1✔
221
                    return b''
1✔
222

223
                try:
1✔
224
                    res = self.recv(1, timeout=timeout)
1✔
225
                except Exception:
×
226
                    self.unrecv(data)
×
227
                    return b''
×
228

229
                if res:
1!
230
                    data += res
1✔
231
                else:
232
                    self.unrecv(data)
×
233
                    return b''
×
234

235
        return data
1✔
236

237
    def recvn(self, numb, timeout = default):
1✔
238
        """recvn(numb, timeout = default) -> bytes
239

240
        Receives exactly `n` bytes.
241

242
        If the request is not satisfied before ``timeout`` seconds pass,
243
        all data is buffered and an empty string (``''``) is returned.
244

245
        Raises:
246
            exceptions.EOFError: The connection closed before the request could be satisfied
247

248
        Returns:
249
            A string containing bytes received from the socket,
250
            or ``''`` if a timeout occurred while waiting.
251

252
        Examples:
253

254
            >>> t = tube()
255
            >>> data = b'hello world'
256
            >>> t.recv_raw = lambda *a: data
257
            >>> t.recvn(len(data)) == data
258
            True
259
            >>> t.recvn(len(data)+1) == data + data[:1]
260
            True
261
            >>> t.recv_raw = lambda *a: None
262
            >>> # The remaining data is buffered
263
            >>> t.recv() == data[1:]
264
            True
265
            >>> t.recv_raw = lambda *a: time.sleep(0.01) or b'a'
266
            >>> t.recvn(10, timeout=0.05)
267
            b''
268
            >>> t.recvn(10, timeout=0.06) # doctest: +ELLIPSIS
269
            b'aaaaaa...'
270
        """
271
        # Keep track of how much data has been received
272
        # It will be pasted together at the end if a
273
        # timeout does not occur, or put into the tube buffer.
274
        with self.countdown(timeout):
1✔
275
            while self.countdown_active() and len(self.buffer) < numb and self._fillbuffer(self.timeout):
1✔
276
                pass
1✔
277

278
        if len(self.buffer) < numb:
1✔
279
            return b''
1✔
280

281
        return self.buffer.get(numb)
1✔
282

283
    def recvuntil(self, delims, drop=False, timeout=default):
1✔
284
        """recvuntil(delims, drop=False, timeout=default) -> bytes
285

286
        Receive data until one of `delims` is encountered.
287

288
        If the request is not satisfied before ``timeout`` seconds pass,
289
        all data is buffered and an empty string (``''``) is returned.
290

291
        arguments:
292
            delims(bytes,tuple): Byte-string of delimiters characters, or list of delimiter byte-strings.
293
            drop(bool): Drop the ending.  If :const:`True` it is removed from the end of the return value.
294

295
        Raises:
296
            exceptions.EOFError: The connection closed before the request could be satisfied
297

298
        Returns:
299
            A string containing bytes received from the socket,
300
            or ``''`` if a timeout occurred while waiting.
301

302
        Examples:
303

304
            >>> t = tube()
305
            >>> t.recv_raw = lambda n: b"Hello World!"
306
            >>> t.recvuntil(b' ')
307
            b'Hello '
308
            >>> _=t.clean(0)
309
            >>> # Matches on 'o' in 'Hello'
310
            >>> t.recvuntil((b' ',b'W',b'o',b'r'))
311
            b'Hello'
312
            >>> _=t.clean(0)
313
            >>> # Matches expressly full string
314
            >>> t.recvuntil(b' Wor')
315
            b'Hello Wor'
316
            >>> _=t.clean(0)
317
            >>> # Matches on full string, drops match
318
            >>> t.recvuntil(b' Wor', drop=True)
319
            b'Hello'
320

321
            >>> # Try with regex special characters
322
            >>> t = tube()
323
            >>> t.recv_raw = lambda n: b"Hello|World"
324
            >>> t.recvuntil(b'|', drop=True)
325
            b'Hello'
326

327
        """
328
        # Convert string into singleton tupple
329
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
330
            delims = (delims,)
1✔
331
        delims = tuple(map(packing._need_bytes, delims))
1✔
332

333
        # Longest delimiter for tracking purposes
334
        longest = max(map(len, delims))
1✔
335

336
        # Cumulative data to search
337
        data = []
1✔
338
        top = b''
1✔
339

340
        with self.countdown(timeout):
1✔
341
            while self.countdown_active():
1✔
342
                try:
1✔
343
                    res = self.recv(timeout=self.timeout)
1✔
344
                except Exception:
1✔
345
                    self.unrecv(b''.join(data) + top)
1✔
346
                    raise
1✔
347

348
                if not res:
1✔
349
                    self.unrecv(b''.join(data) + top)
1✔
350
                    return b''
1✔
351

352
                top += res
1✔
353
                start = len(top)
1✔
354
                for d in delims:
1✔
355
                    j = top.find(d)
1✔
356
                    if start > j > -1:
1✔
357
                        start = j
1✔
358
                        end = j + len(d)
1✔
359
                if start < len(top):
1✔
360
                    self.unrecv(top[end:])
1✔
361
                    if drop:
1✔
362
                        top = top[:start]
1✔
363
                    else:
364
                        top = top[:end]
1✔
365
                    return b''.join(data) + top
1✔
366
                if len(top) > longest:
1✔
367
                    i = -longest - 1
1✔
368
                    data.append(top[:i])
1✔
369
                    top = top[i:]
1✔
370

371
        return b''
1✔
372

373
    def recvlines(self, numlines=2**20, keepends=False, timeout=default):
1✔
374
        r"""recvlines(numlines, keepends=False, timeout=default) -> list of bytes objects
375

376
        Receive up to ``numlines`` lines.
377

378
        A "line" is any sequence of bytes terminated by the byte sequence
379
        set by :attr:`newline`, which defaults to ``'\n'``.
380

381
        If the request is not satisfied before ``timeout`` seconds pass,
382
        all data is buffered and an empty string (``''``) is returned.
383

384
        Arguments:
385
            numlines(int): Maximum number of lines to receive
386
            keepends(bool): Keep newlines at the end of each line (:const:`False`).
387
            timeout(int): Maximum timeout
388

389
        Raises:
390
            exceptions.EOFError: The connection closed before the request could be satisfied
391

392
        Returns:
393
            A string containing bytes received from the socket,
394
            or ``''`` if a timeout occurred while waiting.
395

396
        Examples:
397

398
            >>> t = tube()
399
            >>> t.recv_raw = lambda n: b'\n'
400
            >>> t.recvlines(3)
401
            [b'', b'', b'']
402
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
403
            >>> t.recvlines(3)
404
            [b'Foo', b'Bar', b'Baz']
405
            >>> t.recvlines(3, True)
406
            [b'Foo\n', b'Bar\n', b'Baz\n']
407
        """
408
        lines = []
1✔
409
        with self.countdown(timeout):
1✔
410
            for _ in range(numlines):
1✔
411
                try:
1✔
412
                    # We must set 'keepends' to True here so that we can
413
                    # restore the original, unmodified data to the buffer
414
                    # in the event of a timeout.
415
                    res = self.recvline(keepends=True, timeout=timeout)
1✔
416
                except Exception:
×
417
                    self.unrecv(b''.join(lines))
×
418
                    raise
×
419

420
                if res:
1!
421
                    lines.append(res)
1✔
422
                else:
423
                    break
×
424

425
        if not keepends:
1✔
426
            lines = [line.rstrip(self.newline) for line in lines]
1✔
427

428
        return lines
1✔
429

430
    def recvlinesS(self, numlines=2**20, keepends=False, timeout=default):
1✔
431
        r"""recvlinesS(numlines, keepends=False, timeout=default) -> str list
432

433
        This function is identical to :meth:`recvlines`, but decodes
434
        the received bytes into string using :func:`context.encoding`.
435
        You should use :meth:`recvlines` whenever possible for better performance.
436

437
        Examples:
438

439
            >>> t = tube()
440
            >>> t.recv_raw = lambda n: b'\n'
441
            >>> t.recvlinesS(3)
442
            ['', '', '']
443
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
444
            >>> t.recvlinesS(3)
445
            ['Foo', 'Bar', 'Baz']
446
        """
447
        return [packing._decode(x) for x in self.recvlines(numlines, keepends, timeout)]
1✔
448

449
    def recvlinesb(self, numlines=2**20, keepends=False, timeout=default):
1✔
450
        r"""recvlinesb(numlines, keepends=False, timeout=default) -> bytearray list
451

452
        This function is identical to :meth:`recvlines`, but returns a bytearray.
453

454
        Examples:
455

456
            >>> t = tube()
457
            >>> t.recv_raw = lambda n: b'\n'
458
            >>> t.recvlinesb(3)
459
            [bytearray(b''), bytearray(b''), bytearray(b'')]
460
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
461
            >>> t.recvlinesb(3)
462
            [bytearray(b'Foo'), bytearray(b'Bar'), bytearray(b'Baz')]
463
        """
464
        return [bytearray(x) for x in self.recvlines(numlines, keepends, timeout)]
1✔
465

466
    def recvline(self, keepends=True, timeout=default):
1✔
467
        r"""recvline(keepends=True, timeout=default) -> bytes
468

469
        Receive a single line from the tube.
470

471
        A "line" is any sequence of bytes terminated by the byte sequence
472
        set in :attr:`newline`, which defaults to ``b'\n'``.
473

474
        If the connection is closed (:class:`EOFError`) before a newline
475
        is received, the buffered data is returned by default and a warning
476
        is logged. If the buffer is empty, an :class:`EOFError` is raised.
477
        This behavior can be changed by setting :meth:`pwnlib.context.ContextType.throw_eof_on_incomplete_line`.
478

479
        If the request is not satisfied before ``timeout`` seconds pass,
480
        all data is buffered and an empty byte string (``b''``) is returned.
481

482
        Arguments:
483
            keepends(bool): Keep the line ending (:const:`True`).
484
            timeout(int): Timeout
485

486
        Raises:
487
            :class:`EOFError`: The connection closed before the request
488
                                 could be satisfied and the buffer is empty
489

490
        Return:
491
            All bytes received over the tube until the first
492
            newline ``'\n'`` is received.  Optionally retains
493
            the ending. If the connection is closed before a newline
494
            is received, the remaining data received up to this point
495
            is returned.
496

497

498
        Examples:
499

500
            >>> t = tube()
501
            >>> t.recv_raw = lambda n: b'Foo\nBar\r\nBaz\n'
502
            >>> t.recvline()
503
            b'Foo\n'
504
            >>> t.recvline()
505
            b'Bar\r\n'
506
            >>> t.recvline(keepends = False)
507
            b'Baz'
508
            >>> t.newline = b'\r\n'
509
            >>> t.recvline(keepends = False)
510
            b'Foo\nBar'
511
            >>> t = tube()
512
            >>> def _recv_eof(n):
513
            ...     if not _recv_eof.throw:
514
            ...         _recv_eof.throw = True
515
            ...         return b'real line\ntrailing data'
516
            ...     raise EOFError
517
            >>> _recv_eof.throw = False
518
            >>> t.recv_raw = _recv_eof
519
            >>> t.recvline()
520
            b'real line\n'
521
            >>> t.recvline()
522
            b'trailing data'
523
            >>> t.recvline() # doctest: +ELLIPSIS
524
            Traceback (most recent call last):
525
            ...
526
            EOFError
527
        """
528
        try:
1✔
529
            return self.recvuntil(self.newline, drop = not keepends, timeout = timeout)
1✔
530
        except EOFError:
1✔
531
            if not context.throw_eof_on_incomplete_line and self.buffer.size > 0:
1✔
532
                if context.throw_eof_on_incomplete_line is None:
1!
533
                    self.warn_once('EOFError during recvline. Returning buffered data without trailing newline.')
1✔
534
                return self.buffer.get()
1✔
535
            raise
1✔
536

537
    def recvline_pred(self, pred, keepends=False, timeout=default):
1✔
538
        r"""recvline_pred(pred, keepends=False) -> bytes
539

540
        Receive data until ``pred(line)`` returns a truthy value.
541
        Drop all other data.
542

543
        If the request is not satisfied before ``timeout`` seconds pass,
544
        all data is buffered and an empty string (``''``) is returned.
545

546
        Arguments:
547
            pred(callable): Function to call.  Returns the line for which
548
                this function returns :const:`True`.
549

550
        Examples:
551

552
            >>> t = tube()
553
            >>> t.recv_raw = lambda n: b"Foo\nBar\nBaz\n"
554
            >>> t.recvline_pred(lambda line: line == b"Bar\n")
555
            b'Bar'
556
            >>> t.recvline_pred(lambda line: line == b"Bar\n", keepends=True)
557
            b'Bar\n'
558
            >>> t.recvline_pred(lambda line: line == b'Nope!', timeout=0.1)
559
            b''
560
        """
561

562
        tmpbuf = Buffer()
1✔
563
        line   = b''
1✔
564
        with self.countdown(timeout):
1✔
565
            while self.countdown_active():
1✔
566
                try:
1✔
567
                    line = self.recvline(keepends=True)
1✔
568
                except Exception:
×
569
                    self.buffer.unget(tmpbuf)
×
570
                    raise
×
571

572
                if not line:
1✔
573
                    self.buffer.unget(tmpbuf)
1✔
574
                    return b''
1✔
575

576
                if pred(line):
1✔
577
                    if not keepends:
1✔
578
                        line = line[:-len(self.newline)]
1✔
579
                    return line
1✔
580
                else:
581
                    tmpbuf.add(line)
1✔
582

583
        return b''
1✔
584

585
    def recvline_contains(self, items, keepends = False, timeout = default):
1✔
586
        r"""
587
        Receive lines until one line is found which contains at least
588
        one of `items`.
589

590
        Arguments:
591
            items(str,tuple): List of strings to search for, or a single string.
592
            keepends(bool): Return lines with newlines if :const:`True`
593
            timeout(int): Timeout, in seconds
594

595
        Examples:
596

597
            >>> t = tube()
598
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
599
            >>> t.recvline_contains(b'r')
600
            b'World'
601
            >>> f = lambda n: b"cat dog bird\napple pear orange\nbicycle car train\n"
602
            >>> t = tube()
603
            >>> t.recv_raw = f
604
            >>> t.recvline_contains(b'pear')
605
            b'apple pear orange'
606
            >>> t = tube()
607
            >>> t.recv_raw = f
608
            >>> t.recvline_contains((b'car', b'train'))
609
            b'bicycle car train'
610
        """
611
        if isinstance(items, (bytes, bytearray, six.text_type)):
1✔
612
            items = (items,)
1✔
613
        items = tuple(map(packing._need_bytes, items))
1✔
614

615
        def pred(line):
1✔
616
            return any(d in line for d in items)
1✔
617

618
        return self.recvline_pred(pred, keepends, timeout)
1✔
619

620
    def recvline_startswith(self, delims, keepends=False, timeout=default):
1✔
621
        r"""recvline_startswith(delims, keepends=False, timeout=default) -> bytes
622

623
        Keep receiving lines until one is found that starts with one of
624
        `delims`.  Returns the last line received.
625

626
        If the request is not satisfied before ``timeout`` seconds pass,
627
        all data is buffered and an empty string (``''``) is returned.
628

629
        Arguments:
630
            delims(str,tuple): List of strings to search for, or string of single characters
631
            keepends(bool): Return lines with newlines if :const:`True`
632
            timeout(int): Timeout, in seconds
633

634
        Returns:
635
            The first line received which starts with a delimiter in ``delims``.
636

637
        Examples:
638

639
            >>> t = tube()
640
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
641
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'))
642
            b'World'
643
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), True)
644
            b'Xylophone\n'
645
            >>> t.recvline_startswith(b'Wo')
646
            b'World'
647
        """
648
        # Convert string into singleton tupple
649
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
650
            delims = (delims,)
1✔
651
        delims = tuple(map(packing._need_bytes, delims))
1✔
652

653
        return self.recvline_pred(lambda line: any(map(line.startswith, delims)),
1✔
654
                                  keepends=keepends,
655
                                  timeout=timeout)
656

657
    def recvline_endswith(self, delims, keepends=False, timeout=default):
1✔
658
        r"""recvline_endswith(delims, keepends=False, timeout=default) -> bytes
659

660
        Keep receiving lines until one is found that ends with one of
661
        `delims`.  Returns the last line received.
662

663
        If the request is not satisfied before ``timeout`` seconds pass,
664
        all data is buffered and an empty string (``''``) is returned.
665

666
        See :meth:`recvline_startswith` for more details.
667

668
        Examples:
669

670
            >>> t = tube()
671
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\nKaboodle\n'
672
            >>> t.recvline_endswith(b'r')
673
            b'Bar'
674
            >>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), True)
675
            b'Kaboodle\n'
676
            >>> t.recvline_endswith(b'oodle')
677
            b'Kaboodle'
678
        """
679
        # Convert string into singleton tupple
680
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
681
            delims = (delims,)
1✔
682

683
        delims = tuple(packing._need_bytes(delim) + self.newline for delim in delims)
1✔
684

685
        return self.recvline_pred(lambda line: any(map(line.endswith, delims)),
1✔
686
                                  keepends=keepends,
687
                                  timeout=timeout)
688

689
    def recvregex(self, regex, exact=False, timeout=default, capture=False):
1✔
690
        r"""recvregex(regex, exact=False, timeout=default, capture=False) -> bytes
691

692
        Wrapper around :func:`recvpred`, which will return when a regex
693
        matches the string in the buffer.
694

695
        Returns all received data up until the regex matched. If `capture` is
696
        set to True, a :class:`re.Match` object is returned instead.
697

698
        By default :func:`re.RegexObject.search` is used, but if `exact` is
699
        set to True, then :func:`re.RegexObject.match` will be used instead.
700

701
        If the request is not satisfied before ``timeout`` seconds pass,
702
        all data is buffered and an empty string (``''``) is returned.
703

704
        Examples:
705

706
            >>> t = tube()
707
            >>> t.recv_raw = lambda n: b'The lucky number is 1337 as always\nBla blubb blargh\n'
708
            >>> m = t.recvregex(br'number is ([0-9]+) as always\n', capture=True)
709
            >>> m.group(1)
710
            b'1337'
711
            >>> t.recvregex(br'Bla .* blargh\n')
712
            b'Bla blubb blargh\n'
713
        """
714

715
        if isinstance(regex, (bytes, bytearray, six.text_type)):
1!
716
            regex = packing._need_bytes(regex)
1✔
717
            regex = re.compile(regex)
1✔
718

719
        if exact:
1!
720
            pred = regex.match
×
721
        else:
722
            pred = regex.search
1✔
723

724
        if capture:
1✔
725
            return pred(self.recvpred(pred, timeout = timeout))
1✔
726
        else:
727
            return self.recvpred(pred, timeout = timeout)
1✔
728

729
    def recvline_regex(self, regex, exact=False, keepends=False, timeout=default):
1✔
730
        """recvline_regex(regex, exact=False, keepends=False, timeout=default) -> bytes
731

732
        Wrapper around :func:`recvline_pred`, which will return when a regex
733
        matches a line.
734

735
        By default :func:`re.RegexObject.search` is used, but if `exact` is
736
        set to True, then :func:`re.RegexObject.match` will be used instead.
737

738
        If the request is not satisfied before ``timeout`` seconds pass,
739
        all data is buffered and an empty string (``''``) is returned.
740
        """
741

742
        if isinstance(regex, (bytes, bytearray, six.text_type)):
×
743
            regex = packing._need_bytes(regex)
×
744
            regex = re.compile(regex)
×
745

746
        if exact:
×
747
            pred = regex.match
×
748
        else:
749
            pred = regex.search
×
750

751
        return self.recvline_pred(pred, keepends = keepends, timeout = timeout)
×
752

753
    def recvrepeat(self, timeout=default):
1✔
754
        """recvrepeat(timeout=default) -> bytes
755

756
        Receives data until a timeout or EOF is reached.
757

758
        Examples:
759

760
            >>> data = [
761
            ... b'd',
762
            ... b'', # simulate timeout
763
            ... b'c',
764
            ... b'b',
765
            ... b'a',
766
            ... ]
767
            >>> def delayrecv(n, data=data):
768
            ...     return data.pop()
769
            >>> t = tube()
770
            >>> t.recv_raw = delayrecv
771
            >>> t.recvrepeat(0.2)
772
            b'abc'
773
            >>> t.recv()
774
            b'd'
775
        """
776

777
        try:
1✔
778
            while self._fillbuffer(timeout=timeout):
1✔
779
                pass
1✔
780
        except EOFError:
×
781
            pass
×
782

783
        return self.buffer.get()
1✔
784

785
    def recvall(self, timeout=Timeout.forever):
1✔
786
        """recvall(timeout=Timeout.forever) -> bytes
787

788
        Receives data until EOF is reached and closes the tube.
789
        """
790

791
        with self.waitfor('Receiving all data') as h:
1✔
792
            l = len(self.buffer)
1✔
793
            with self.local(timeout):
1✔
794
                try:
1✔
795
                    while True:
1✔
796
                        l = misc.size(len(self.buffer))
1✔
797
                        h.status(l)
1✔
798
                        if not self._fillbuffer():
1✔
799
                            break
1✔
800
                except EOFError:
1✔
801
                    pass
1✔
802
            h.success("Done (%s)" % l)
1✔
803
        self.close()
1✔
804

805
        return self.buffer.get()
1✔
806

807
    def send(self, data):
1✔
808
        """send(data)
809

810
        Sends data.
811

812
        If log level ``DEBUG`` is enabled, also prints out the data
813
        received.
814

815
        If it is not possible to send anymore because of a closed
816
        connection, it raises ``exceptions.EOFError``
817

818
        Examples:
819

820
            >>> def p(x): print(repr(x))
821
            >>> t = tube()
822
            >>> t.send_raw = p
823
            >>> t.send(b'hello')
824
            b'hello'
825
        """
826

827
        data = packing._need_bytes(data)
1✔
828

829
        if self.isEnabledFor(logging.DEBUG):
1!
830
            self.debug('Sent %#x bytes:' % len(data))
×
831
            self.maybe_hexdump(data, level=logging.DEBUG)
×
832

833
        self.send_raw(data)
1✔
834

835
    def sendline(self, line=b''):
1✔
836
        r"""sendline(data)
837

838
        Shorthand for ``t.send(data + t.newline)``.
839

840
        Examples:
841

842
            >>> def p(x): print(repr(x))
843
            >>> t = tube()
844
            >>> t.send_raw = p
845
            >>> t.sendline(b'hello')
846
            b'hello\n'
847
            >>> t.newline = b'\r\n'
848
            >>> t.sendline(b'hello')
849
            b'hello\r\n'
850
        """
851

852
        line = packing._need_bytes(line)
1✔
853

854
        self.send(line + self.newline)
1✔
855

856
    def sendlines(self, lines=[]):
1✔
857
        for line in lines:
×
858
            line = packing._need_bytes(line)
×
859
            self.sendline(line)
×
860

861
    def sendafter(self, delim, data, timeout = default):
1✔
862
        """sendafter(delim, data, timeout = default) -> str
863

864
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``send(data)``.
865
        """
866

867
        data = packing._need_bytes(data)
×
868
        res = self.recvuntil(delim, timeout=timeout)
×
869
        self.send(data)
×
870
        return res
×
871

872
    def sendlineafter(self, delim, data, timeout = default):
1✔
873
        """sendlineafter(delim, data, timeout = default) -> str
874

875
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``sendline(data)``."""
876

877
        data = packing._need_bytes(data)
1✔
878
        res = self.recvuntil(delim, timeout=timeout)
1✔
879
        self.sendline(data)
1✔
880
        return res
1✔
881

882
    def sendthen(self, delim, data, timeout = default):
1✔
883
        """sendthen(delim, data, timeout = default) -> str
884

885
        A combination of ``send(data)`` and ``recvuntil(delim, timeout=timeout)``."""
886

887
        data = packing._need_bytes(data)
×
888
        self.send(data)
×
889
        return self.recvuntil(delim, timeout=timeout)
×
890

891
    def sendlinethen(self, delim, data, timeout = default):
1✔
892
        """sendlinethen(delim, data, timeout = default) -> str
893

894
        A combination of ``sendline(data)`` and ``recvuntil(delim, timeout=timeout)``."""
895

896
        data = packing._need_bytes(data)
×
897
        self.sendline(data)
×
898
        return self.recvuntil(delim, timeout=timeout)
×
899

900
    def interactive(self, prompt = term.text.bold_red('$') + ' '):
1✔
901
        """interactive(prompt = pwnlib.term.text.bold_red('$') + ' ')
902

903
        Does simultaneous reading and writing to the tube. In principle this just
904
        connects the tube to standard in and standard out, but in practice this
905
        is much more usable, since we are using :mod:`pwnlib.term` to print a
906
        floating prompt.
907

908
        Thus it only works while in :data:`pwnlib.term.term_mode`.
909
        """
910

911
        self.info('Switching to interactive mode')
1✔
912

913
        go = threading.Event()
1✔
914
        def recv_thread():
1✔
915
            while not go.is_set():
1✔
916
                try:
1✔
917
                    cur = self.recv(timeout = 0.05)
1✔
918
                    cur = cur.replace(self.newline, b'\n')
1✔
919
                    if cur:
1✔
920
                        stdout = sys.stdout
1✔
921
                        if not term.term_mode:
1!
922
                            stdout = getattr(stdout, 'buffer', stdout)
1✔
923
                        stdout.write(cur)
1✔
924
                        stdout.flush()
1✔
925
                except EOFError:
×
926
                    self.info('Got EOF while reading in interactive')
×
927
                    break
×
928

929
        t = context.Thread(target = recv_thread)
1✔
930
        t.daemon = True
1✔
931
        t.start()
1✔
932

933
        from pwnlib.args import term_mode
1✔
934
        try:
1✔
935
            os_linesep = os.linesep.encode()
1✔
936
            to_skip = b''
1✔
937
            while not go.is_set():
1✔
938
                if term.term_mode:
1!
939
                    data = term.readline.readline(prompt = prompt, float = True)
×
940
                    if data.endswith(b'\n') and self.newline != b'\n':
×
941
                        data = data[:-1] + self.newline
×
942
                else:
943
                    stdin = getattr(sys.stdin, 'buffer', sys.stdin)
1✔
944
                    data = stdin.read(1)
1✔
945
                    # Keep OS's line separator if NOTERM is set and
946
                    # the user did not specify a custom newline
947
                    # even if stdin is a tty.
948
                    if sys.stdin.isatty() and (
1!
949
                        term_mode
950
                        or context.newline != b"\n"
951
                        or self._newline is not None
952
                    ):
953
                        if to_skip:
×
954
                            if to_skip[:1] != data:
×
955
                                data = os_linesep[: -len(to_skip)] + data
×
956
                            else:
957
                                to_skip = to_skip[1:]
×
958
                                if to_skip:
×
959
                                    continue
×
960
                                data = self.newline
×
961
                        # If we observe a prefix of the line separator in a tty,
962
                        # assume we'll see the rest of it immediately after.
963
                        # This could stall until the next character is seen if
964
                        # the line separator is started but never finished, but
965
                        # that is unlikely to happen in a dynamic tty.
966
                        elif data and os_linesep.startswith(data):
×
967
                            if len(os_linesep) > 1:
×
968
                                to_skip = os_linesep[1:]
×
969
                                continue
×
970
                            data = self.newline
×
971

972
                if data:
1!
973
                    try:
×
974
                        self.send(data)
×
975
                    except EOFError:
×
976
                        go.set()
×
977
                        self.info('Got EOF while sending in interactive')
×
978
                else:
979
                    go.set()
1✔
980
        except KeyboardInterrupt:
×
981
            self.info('Interrupted')
×
982
            go.set()
×
983

984
        while t.is_alive():
1✔
985
            t.join(timeout = 0.1)
1✔
986

987
    def stream(self, line_mode=True):
1✔
988
        """stream()
989

990
        Receive data until the tube exits, and print it to stdout.
991

992
        Similar to :func:`interactive`, except that no input is sent.
993

994
        Similar to ``print(tube.recvall())`` except that data is printed
995
        as it is received, rather than after all data is received.
996

997
        Arguments:
998
            line_mode(bool): Whether to receive line-by-line or raw data.
999

1000
        Returns:
1001
            All data printed.
1002
        """
1003
        buf = Buffer()
×
1004
        function = self.recvline if line_mode else self.recv
×
1005
        try:
×
1006
            while True:
×
1007
                buf.add(function())
×
1008
                stdout = sys.stdout
×
1009
                if not term.term_mode:
×
1010
                    stdout = getattr(stdout, 'buffer', stdout)
×
1011
                stdout.write(buf.data[-1])
×
1012
        except KeyboardInterrupt:
×
1013
            pass
×
1014
        except EOFError:
×
1015
            pass
×
1016

1017
        return buf.get()
×
1018

1019
    def clean(self, timeout = 0.05):
1✔
1020
        """clean(timeout = 0.05)
1021

1022
        Removes all the buffered data from a tube by calling
1023
        :meth:`pwnlib.tubes.tube.tube.recv` with a low timeout until it fails.
1024

1025
        If ``timeout`` is zero, only cached data will be cleared.
1026

1027
        Note: If timeout is set to zero, the underlying network is
1028
        not actually polled; only the internal buffer is cleared.
1029

1030
        Returns:
1031

1032
            All data received
1033

1034
        Examples:
1035

1036
            >>> t = tube()
1037
            >>> t.unrecv(b'clean me up')
1038
            >>> t.clean(0)
1039
            b'clean me up'
1040
            >>> len(t.buffer)
1041
            0
1042
        """
1043
        if timeout == 0:
1✔
1044
            return self.buffer.get()
1✔
1045

1046
        return self.recvrepeat(timeout)
1✔
1047

1048
    def clean_and_log(self, timeout = 0.05):
1✔
1049
        r"""clean_and_log(timeout = 0.05)
1050

1051
        Works exactly as :meth:`pwnlib.tubes.tube.tube.clean`, but logs received
1052
        data with :meth:`pwnlib.self.info`.
1053

1054
        Returns:
1055

1056
            All data received
1057

1058
        Examples:
1059

1060
            >>> def recv(n, data=[b'', b'hooray_data']):
1061
            ...     while data: return data.pop()
1062
            >>> t = tube()
1063
            >>> t.recv_raw      = recv
1064
            >>> t.connected_raw = lambda d: True
1065
            >>> t.fileno        = lambda: 1234
1066
            >>> with context.local(log_level='info'):
1067
            ...     data = t.clean_and_log() #doctest: +ELLIPSIS
1068
            [DEBUG] Received 0xb bytes:
1069
                b'hooray_data'
1070
            >>> data
1071
            b'hooray_data'
1072
            >>> context.clear()
1073
        """
1074
        cached_data = self.buffer.get()
1✔
1075
        if cached_data and not self.isEnabledFor(logging.DEBUG):
1!
1076
            with context.local(log_level='debug'):
×
1077
                self.debug('Received %#x bytes:' % len(cached_data))
×
1078
                self.maybe_hexdump(cached_data, level=logging.DEBUG)
×
1079
        with context.local(log_level='debug'):
1✔
1080
            return cached_data + self.clean(timeout)
1✔
1081

1082
    def upload_manually(self, data, target_path = './payload', prompt = b'$', chunk_size = 0x200, chmod_flags = 'u+x', compression='auto', end_marker = 'PWNTOOLS_DONE'):
1✔
1083
        """upload_manually(data, target_path = './payload', prompt = b'$', chunk_size = 0x200, chmod_flags = 'u+x', compression='auto', end_marker = 'PWNTOOLS_DONE')
1084

1085
        Upload a file manually using base64 encoding and compression.
1086
        This can be used when the tube is connected to a shell.
1087

1088
        The file is uploaded in base64-encoded chunks by appending to a file
1089
        and then decompressing it:
1090

1091
        ```
1092
        loop:
1093
            echo <chunk> | base64 -d >> <target_path>.<compression>
1094
        <compression> -d -f <target_path>.<compression>
1095
        chmod <chmod_flags> <target_path>
1096
        ```
1097

1098
        It is assumed that a `base64` command is available on the target system.
1099
        When ``compression`` is ``auto`` the best compression utility available
1100
        between ``gzip`` and ``xz`` is chosen with a fallback to uncompressed
1101
        upload.
1102

1103
        Arguments:
1104

1105
            data(bytes): The data to upload.
1106
            target_path(str): The path to upload the data to.
1107
            prompt(bytes): The shell prompt to wait for.
1108
            chunk_size(int): The size of each chunk to upload.
1109
            chmod_flags(str): The flags to use with chmod. ``""`` to ignore.
1110
            compression(str): The compression to use. ``auto`` to automatically choose the best compression or ``gzip`` or ``xz``.        
1111
            end_marker(str): The marker to use to detect the end of the output. Only used when prompt is not set.
1112

1113
        Examples:
1114

1115
        >>> l = listen()
1116
        >>> l.spawn_process('/bin/sh')
1117
        >>> r = remote('127.0.0.1', l.lport)
1118
        >>> r.upload_manually(b'some\\xca\\xfedata\\n', prompt=b'', chmod_flags='')
1119
        >>> r.sendline(b'cat ./payload')
1120
        >>> r.recvline()
1121
        b'some\\xca\\xfedata\\n'
1122

1123
        >>> r.upload_manually(cyclic(0x1000), target_path='./cyclic_pattern', prompt=b'', chunk_size=0x10, compression='gzip')
1124
        >>> r.sendline(b'sha256sum ./cyclic_pattern')
1125
        >>> r.recvlineS(keepends=False).startswith(sha256sumhex(cyclic(0x1000)))
1126
        True
1127

1128
        >>> blob = ELF.from_assembly(shellcraft.echo('Hello world!\\n') + shellcraft.exit(0))
1129
        >>> r.upload_manually(blob.data, prompt=b'')
1130
        >>> r.sendline(b'./payload')
1131
        >>> r.recvline()
1132
        b'Hello world!\\n'
1133
        >>> r.close()
1134
        >>> l.close()
1135
        """
1136
        echo_end = ""
1✔
1137
        if not prompt:
1!
1138
            echo_end = "; echo {}".format(end_marker)
1✔
1139
            end_markerb = end_marker.encode()
1✔
1140
        else:
NEW
1141
            end_markerb = prompt
×
1142

1143
        # Detect available compression utility, fallback to uncompressed upload.
1144
        compression_mode = None
1✔
1145
        possible_compression = ['gzip']
1✔
1146
        if six.PY3:
1✔
1147
            possible_compression.insert(0, 'xz')
1✔
1148
        if not prompt:
1!
1149
            self.sendline("echo {}".format(end_marker).encode())
1✔
1150
        if compression == 'auto':
1✔
1151
            for utility in possible_compression:
1!
1152
                self.sendlineafter(end_markerb, "command -v {} && echo YEP || echo NOPE{}".format(utility, echo_end).encode())
1✔
1153
                result = self.recvuntil([b'YEP', b'NOPE'])
1✔
1154
                if b'YEP' in result:
1!
1155
                    compression_mode = utility
1✔
1156
                    break
1✔
1157
        elif compression in possible_compression:
1!
1158
            compression_mode = compression
1✔
1159
        else:
NEW
1160
            self.error('Invalid compression mode: %s, has to be one of %s', compression, possible_compression)
×
1161

1162
        self.debug('Manually uploading using compression mode: %s', compression_mode)
1✔
1163

1164
        compressed_data = b''
1✔
1165
        if compression_mode == 'xz':
1✔
1166
            import lzma
1✔
1167
            compressed_data = lzma.compress(data, format=lzma.FORMAT_XZ, preset=9)
1✔
1168
            compressed_path = target_path + '.xz'
1✔
1169
        elif compression_mode == 'gzip':
1!
1170
            import gzip
1✔
1171
            from six import BytesIO
1✔
1172
            f = BytesIO()
1✔
1173
            with gzip.GzipFile(fileobj=f, mode='wb', compresslevel=9) as g:
1✔
1174
                g.write(data)
1✔
1175
            compressed_data = f.getvalue()
1✔
1176
            compressed_path = target_path + '.gz'
1✔
1177
        else:
NEW
1178
            compressed_path = target_path
×
1179

1180
        # Don't compress if it doesn't reduce the size.
1181
        if len(compressed_data) >= len(data):
1✔
1182
            compression_mode = None
1✔
1183
            compressed_path = target_path
1✔
1184
        else:
1185
            data = compressed_data
1✔
1186

1187
        # Upload data in `chunk_size` chunks. Assume base64 is available.
1188
        with self.progress('Uploading payload') as p:
1✔
1189
            for idx, chunk in enumerate(iters.group(chunk_size, data)):
1✔
1190
                if None in chunk:
1✔
1191
                    chunk = chunk[:chunk.index(None)]
1✔
1192
                if idx == 0:
1✔
1193
                    self.sendlineafter(end_markerb, "echo {} | base64 -d > {}{}".format(fiddling.b64e(bytearray(chunk)), compressed_path, echo_end).encode())
1✔
1194
                else:
1195
                    self.sendlineafter(end_markerb, "echo {} | base64 -d >> {}{}".format(fiddling.b64e(bytearray(chunk)), compressed_path, echo_end).encode())
1✔
1196
                p.status('{}/{} {}'.format(idx+1, len(data)//chunk_size+1, misc.size(idx*chunk_size + len(chunk))))
1✔
1197
            p.success(misc.size(len(data)))
1✔
1198

1199
        # Decompress the file and set the permissions.
1200
        if compression_mode is not None:
1✔
1201
            self.sendlineafter(end_markerb, '{} -d -f {}{}'.format(compression_mode, compressed_path, echo_end).encode())
1✔
1202
        if chmod_flags:
1✔
1203
            self.sendlineafter(end_markerb, 'chmod {} {}{}'.format(chmod_flags, target_path, echo_end).encode())
1✔
1204
        if not prompt:
1!
1205
            self.recvuntil(end_markerb + b'\n')
1✔
1206

1207
    def connect_input(self, other):
1✔
1208
        """connect_input(other)
1209

1210
        Connects the input of this tube to the output of another tube object.
1211

1212

1213
        Examples:
1214

1215
            >>> def p(x): print(x.decode())
1216
            >>> def recvone(n, data=[b'data']):
1217
            ...     while data: return data.pop()
1218
            ...     raise EOFError
1219
            >>> a = tube()
1220
            >>> b = tube()
1221
            >>> a.recv_raw = recvone
1222
            >>> b.send_raw = p
1223
            >>> a.connected_raw = lambda d: True
1224
            >>> b.connected_raw = lambda d: True
1225
            >>> a.shutdown      = lambda d: True
1226
            >>> b.shutdown      = lambda d: True
1227
            >>> import time
1228
            >>> _=(b.connect_input(a), time.sleep(0.1))
1229
            data
1230
        """
1231

1232
        def pump():
1✔
1233
            import sys as _sys
1✔
1234
            while self.countdown_active():
1!
1235
                if not (self.connected('send') and other.connected('recv')):
1✔
1236
                    break
1✔
1237

1238
                try:
1✔
1239
                    data = other.recv(timeout = 0.05)
1✔
1240
                except EOFError:
1✔
1241
                    break
1✔
1242

1243
                if not _sys:
1!
1244
                    return
×
1245

1246
                if not data:
1✔
1247
                    continue
1✔
1248

1249
                try:
1✔
1250
                    self.send(data)
1✔
1251
                except EOFError:
×
1252
                    break
×
1253

1254
                if not _sys:
1!
1255
                    return
×
1256

1257
            self.shutdown('send')
1✔
1258
            other.shutdown('recv')
1✔
1259

1260
        t = context.Thread(target = pump)
1✔
1261
        t.daemon = True
1✔
1262
        t.start()
1✔
1263

1264
    def connect_output(self, other):
1✔
1265
        """connect_output(other)
1266

1267
        Connects the output of this tube to the input of another tube object.
1268

1269
        Examples:
1270

1271
            >>> def p(x): print(repr(x))
1272
            >>> def recvone(n, data=[b'data']):
1273
            ...     while data: return data.pop()
1274
            ...     raise EOFError
1275
            >>> a = tube()
1276
            >>> b = tube()
1277
            >>> a.recv_raw = recvone
1278
            >>> b.send_raw = p
1279
            >>> a.connected_raw = lambda d: True
1280
            >>> b.connected_raw = lambda d: True
1281
            >>> a.shutdown      = lambda d: True
1282
            >>> b.shutdown      = lambda d: True
1283
            >>> _=(a.connect_output(b), time.sleep(0.1))
1284
            b'data'
1285
        """
1286

1287
        other.connect_input(self)
1✔
1288

1289
    def connect_both(self, other):
1✔
1290
        """connect_both(other)
1291

1292
        Connects the both ends of this tube object with another tube object."""
1293

1294
        self.connect_input(other)
1✔
1295
        self.connect_output(other)
1✔
1296

1297
    def spawn_process(self, *args, **kwargs):
1✔
1298
        """Spawns a new process having this tube as stdin, stdout and stderr.
1299

1300
        Takes the same arguments as :class:`subprocess.Popen`."""
1301

1302
        return subprocess.Popen(
1✔
1303
            *args,
1304
            stdin = self.fileno(),
1305
            stdout = self.fileno(),
1306
            stderr = self.fileno(),
1307
            **kwargs
1308
        )
1309

1310
    def __lshift__(self, other):
1✔
1311
        """
1312
        Shorthand for connecting multiple tubes.
1313

1314
        See :meth:`connect_input` for more information.
1315

1316
        Examples:
1317

1318
            The following are equivalent ::
1319

1320
                tube_a >> tube.b
1321
                tube_a.connect_input(tube_b)
1322

1323
            This is useful when chaining multiple tubes ::
1324

1325
                tube_a >> tube_b >> tube_a
1326
                tube_a.connect_input(tube_b)
1327
                tube_b.connect_input(tube_a)
1328
        """
1329
        self.connect_input(other)
×
1330
        return other
×
1331

1332
    def __rshift__(self, other):
1✔
1333
        """
1334
        Inverse of the ``<<`` operator.  See :meth:`__lshift__`.
1335

1336
        See :meth:`connect_input` for more information.
1337
        """
1338
        self.connect_output(other)
×
1339
        return other
×
1340

1341
    def __ne__(self, other):
1✔
1342
        """
1343
        Shorthand for connecting tubes to eachother.
1344

1345
        The following are equivalent ::
1346

1347
            a >> b >> a
1348
            a <> b
1349

1350
        See :meth:`connect_input` for more information.
1351
        """
1352
        self << other << self
×
1353

1354
    def wait_for_close(self, timeout=default):
1✔
1355
        """Waits until the tube is closed."""
1356

1357
        with self.countdown(timeout):
1✔
1358
            while self.countdown_active():
1!
1359
                if not self.connected():
1✔
1360
                    return
1✔
1361
                time.sleep(min(self.timeout, 0.05))
1✔
1362

1363
    wait = wait_for_close
1✔
1364

1365
    def can_recv(self, timeout = 0):
1✔
1366
        """can_recv(timeout = 0) -> bool
1367

1368
        Returns True, if there is data available within `timeout` seconds.
1369

1370
        Examples:
1371

1372
            >>> import time
1373
            >>> t = tube()
1374
            >>> t.can_recv_raw = lambda *a: False
1375
            >>> t.can_recv()
1376
            False
1377
            >>> _=t.unrecv(b'data')
1378
            >>> t.can_recv()
1379
            True
1380
            >>> _=t.recv()
1381
            >>> t.can_recv()
1382
            False
1383
        """
1384

1385
        return bool(self.buffer or self.can_recv_raw(timeout))
1✔
1386

1387
    def settimeout(self, timeout):
1✔
1388
        """settimeout(timeout)
1389

1390
        Set the timeout for receiving operations. If the string "default"
1391
        is given, then :data:`context.timeout` will be used. If None is given,
1392
        then there will be no timeout.
1393

1394
        Examples:
1395

1396
            >>> t = tube()
1397
            >>> t.settimeout_raw = lambda t: None
1398
            >>> t.settimeout(3)
1399
            >>> t.timeout == 3
1400
            True
1401
        """
1402

1403
        self.timeout = timeout
1✔
1404

1405

1406
    shutdown_directions = {
1✔
1407
        'in':    'recv',
1408
        'read':  'recv',
1409
        'recv':  'recv',
1410
        'out':   'send',
1411
        'write': 'send',
1412
        'send':  'send',
1413
    }
1414

1415
    connected_directions = shutdown_directions.copy()
1✔
1416
    connected_directions['any'] = 'any'
1✔
1417

1418
    def shutdown(self, direction = "send"):
1✔
1419
        """shutdown(direction = "send")
1420

1421
        Closes the tube for futher reading or writing depending on `direction`.
1422

1423
        Arguments:
1424
          direction(str): Which direction to close; "in", "read" or "recv"
1425
            closes the tube in the ingoing direction, "out", "write" or "send"
1426
            closes it in the outgoing direction.
1427

1428
        Returns:
1429
          :const:`None`
1430

1431
        Examples:
1432

1433
            >>> def p(x): print(x)
1434
            >>> t = tube()
1435
            >>> t.shutdown_raw = p
1436
            >>> _=list(map(t.shutdown, ('in', 'read', 'recv', 'out', 'write', 'send')))
1437
            recv
1438
            recv
1439
            recv
1440
            send
1441
            send
1442
            send
1443
            >>> t.shutdown('bad_value') #doctest: +ELLIPSIS
1444
            Traceback (most recent call last):
1445
            ...
1446
            KeyError: "direction must be in ['in', 'out', 'read', 'recv', 'send', 'write']"
1447
        """
1448
        try:
1✔
1449
            direction = self.shutdown_directions[direction]
1✔
1450
        except KeyError:
1✔
1451
            raise KeyError('direction must be in %r' % sorted(self.shutdown_directions))
1✔
1452
        else:
1453
            self.shutdown_raw(self.shutdown_directions[direction])
1✔
1454

1455
    def connected(self, direction = 'any'):
1✔
1456
        """connected(direction = 'any') -> bool
1457

1458
        Returns True if the tube is connected in the specified direction.
1459

1460
        Arguments:
1461
          direction(str): Can be the string 'any', 'in', 'read', 'recv',
1462
                          'out', 'write', 'send'.
1463

1464
        Doctest:
1465

1466
            >>> def p(x): print(x)
1467
            >>> t = tube()
1468
            >>> t.connected_raw = p
1469
            >>> _=list(map(t.connected, ('any', 'in', 'read', 'recv', 'out', 'write', 'send')))
1470
            any
1471
            recv
1472
            recv
1473
            recv
1474
            send
1475
            send
1476
            send
1477
            >>> t.connected('bad_value') #doctest: +ELLIPSIS
1478
            Traceback (most recent call last):
1479
            ...
1480
            KeyError: "direction must be in ['any', 'in', 'out', 'read', 'recv', 'send', 'write']"
1481
        """
1482
        try:
1✔
1483
            direction = self.connected_directions[direction]
1✔
1484
        except KeyError:
1✔
1485
            raise KeyError('direction must be in %r' % sorted(self.connected_directions))
1✔
1486
        else:
1487
            return self.connected_raw(direction)
1✔
1488

1489
    def __enter__(self):
1✔
1490
        """Permit use of 'with' to control scoping and closing sessions.
1491

1492
        Examples:
1493

1494
            >>> t = tube()
1495
            >>> def p(x): print(x)
1496
            >>> t.close = lambda: p("Closed!")
1497
            >>> with t: pass
1498
            Closed!
1499
        """
1500
        return self
1✔
1501

1502
    def __exit__(self, type, value, traceback):
1✔
1503
        """Handles closing for 'with' statement
1504

1505
        See :meth:`__enter__`
1506
        """
1507
        self.close()
1✔
1508

1509
    # The minimal interface to be implemented by a child
1510
    @abc.abstractmethod
1✔
1511
    def recv_raw(self, numb):
1✔
1512
        """recv_raw(numb) -> str
1513

1514
        Should not be called directly. Receives data without using the buffer
1515
        on the object.
1516

1517
        Unless there is a timeout or closed connection, this should always
1518
        return data. In case of a timeout, it should return None, in case
1519
        of a closed connection it should raise an ``exceptions.EOFError``.
1520
        """
1521

1522
        raise EOFError('Not implemented')
1✔
1523

1524
    @abc.abstractmethod
1✔
1525
    def send_raw(self, data):
1✔
1526
        """send_raw(data)
1527

1528
        Should not be called directly. Sends data to the tube.
1529

1530
        Should return ``exceptions.EOFError``, if it is unable to send any
1531
        more, because of a closed tube.
1532
        """
1533

1534
        raise EOFError('Not implemented')
×
1535

1536
    def settimeout_raw(self, timeout):
1✔
1537
        """settimeout_raw(timeout)
1538

1539
        Should not be called directly. Sets the timeout for
1540
        the tube.
1541
        """
1542

1543
        raise NotImplementedError()
1✔
1544

1545
    def timeout_change(self):
1✔
1546
        """
1547
        Should not be called directly. Informs the raw layer of the tube that the timeout has changed.
1548

1549

1550
        Inherited from :class:`Timeout`.
1551
        """
1552
        try:
1✔
1553
            self.settimeout_raw(self.timeout)
1✔
1554
        except NotImplementedError:
1✔
1555
            pass
1✔
1556

1557
    def can_recv_raw(self, timeout):
1✔
1558
        """can_recv_raw(timeout) -> bool
1559

1560
        Should not be called directly. Returns True, if
1561
        there is data available within the timeout, but
1562
        ignores the buffer on the object.
1563
        """
1564

1565
        raise NotImplementedError()
×
1566

1567
    def connected_raw(self, direction):
1✔
1568
        """connected(direction = 'any') -> bool
1569

1570
        Should not be called directly.  Returns True iff the
1571
        tube is connected in the given direction.
1572
        """
1573

1574
        raise NotImplementedError()
×
1575

1576
    def close(self):
1✔
1577
        """close()
1578

1579
        Closes the tube.
1580
        """
1581
        pass
×
1582
        # Ideally we could:
1583
        # raise NotImplementedError()
1584
        # But this causes issues with the unit tests.
1585

1586
    def fileno(self):
1✔
1587
        """fileno() -> int
1588

1589
        Returns the file number used for reading.
1590
        """
1591

1592
        raise NotImplementedError()
×
1593

1594
    def shutdown_raw(self, direction):
1✔
1595
        """shutdown_raw(direction)
1596

1597
        Should not be called directly.  Closes the tube for further reading or
1598
        writing.
1599
        """
1600

1601
        raise NotImplementedError()
×
1602

1603

1604
    def p64(self, *a, **kw):        return self.send(packing.p64(*a, **kw))
1!
1605
    def p32(self, *a, **kw):        return self.send(packing.p32(*a, **kw))
1!
1606
    def p16(self, *a, **kw):        return self.send(packing.p16(*a, **kw))
1!
1607
    def p8(self, *a, **kw):         return self.send(packing.p8(*a, **kw))
1!
1608
    def pack(self, *a, **kw):       return self.send(packing.pack(*a, **kw))
1✔
1609

1610
    def u64(self, *a, **kw):        return packing.u64(self.recvn(8), *a, **kw)
1!
1611
    def u32(self, *a, **kw):        return packing.u32(self.recvn(4), *a, **kw)
1!
1612
    def u16(self, *a, **kw):        return packing.u16(self.recvn(2), *a, **kw)
1!
1613
    def u8(self, *a, **kw):         return packing.u8(self.recvn(1), *a, **kw)
1!
1614
    def unpack(self, *a, **kw):     return packing.unpack(self.recvn(context.bytes), *a, **kw)
1✔
1615

1616
    def flat(self, *a, **kw):       return self.send(packing.flat(*a,**kw))
1!
1617
    def fit(self, *a, **kw):        return self.send(packing.fit(*a, **kw))
1!
1618

1619
    # Dynamic functions
1620

1621
    def make_wrapper(func):
1✔
1622
        def wrapperb(self, *a, **kw):
1✔
1623
            return bytearray(func(self, *a, **kw))
×
1624
        def wrapperS(self, *a, **kw):
1✔
1625
            return packing._decode(func(self, *a, **kw))
1✔
1626
        wrapperb.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a bytearray'.format(func=func)
1✔
1627
        wrapperb.__name__ = func.__name__ + 'b'
1✔
1628
        wrapperS.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a str, ' \
1✔
1629
                           'decoding the result using `context.encoding`. ' \
1630
                           '(note that the binary versions are way faster)'.format(func=func)
1631
        wrapperS.__name__ = func.__name__ + 'S'
1✔
1632
        return wrapperb, wrapperS
1✔
1633

1634
    for func in [recv,
1✔
1635
                 recvn,
1636
                 recvall,
1637
                 recvrepeat,
1638
                 recvuntil,
1639
                 recvpred,
1640
                 recvregex,
1641
                 recvline,
1642
                 recvline_contains,
1643
                 recvline_startswith,
1644
                 recvline_endswith,
1645
                 recvline_regex]:
1646
        for wrapper in make_wrapper(func):
1✔
1647
            locals()[wrapper.__name__] = wrapper
1✔
1648

1649
    def make_wrapper(func, alias):
1✔
1650
        def wrapper(self, *a, **kw):
1✔
1651
            return func(self, *a, **kw)
1✔
1652
        wrapper.__doc__ = 'Alias for :meth:`{func.__name__}`'.format(func=func)
1✔
1653
        wrapper.__name__ = alias
1✔
1654
        return wrapper
1✔
1655

1656
    for _name in list(locals()):
1✔
1657
        if 'recv' in _name:
1✔
1658
            _name2 = _name.replace('recv', 'read')
1✔
1659
        elif 'send' in _name:
1✔
1660
            _name2 = _name.replace('send', 'write')
1✔
1661
        else:
1662
            continue
1✔
1663
        locals()[_name2] = make_wrapper(locals()[_name], _name2)
1✔
1664

1665
    # Clean up the scope
1666
    del wrapper, func, make_wrapper, _name, _name2
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc