• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 11186931190

04 Oct 2024 09:02PM UTC coverage: 73.934% (+0.02%) from 73.91%
11186931190

Pull #2476

github

web-flow
Merge ddef6b397 into 9f92ed04f
Pull Request #2476: Deprecate 'keepends' argument in favor of 'drop'

4563 of 7419 branches covered (61.5%)

29 of 32 new or added lines in 1 file covered. (90.63%)

66 existing lines in 1 file now uncovered.

13260 of 17935 relevant lines covered (73.93%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.04
/pwnlib/tubes/tube.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4

5
import abc
1✔
6
import logging
1✔
7
import os
1✔
8
import re
1✔
9
import six
1✔
10
import string
1✔
11
import subprocess
1✔
12
import sys
1✔
13
import threading
1✔
14
import time
1✔
15

16
from six.moves import range
1✔
17

18
from pwnlib import atexit
1✔
19
from pwnlib import term
1✔
20
from pwnlib.context import context
1✔
21
from pwnlib.log import Logger
1✔
22
from pwnlib.timeout import Timeout
1✔
23
from pwnlib.tubes.buffer import Buffer
1✔
24
from pwnlib.util import misc
1✔
25
from pwnlib.util import packing
1✔
26

27

28
class tube(Timeout, Logger):
1✔
29
    """
30
    Container of all the tube functions common to sockets, TTYs and SSH connetions.
31
    """
32

33
    default = Timeout.default
1✔
34
    forever = Timeout.forever
1✔
35

36
    def __init__(self, timeout = default, level = None, *a, **kw):
1✔
37
        super(tube, self).__init__(timeout)
1✔
38

39
        Logger.__init__(self, None)
1✔
40
        if level is not None:
1✔
41
            self.setLevel(level)
1✔
42

43
        self.buffer = Buffer(*a, **kw)
1✔
44
        self._newline = None
1✔
45
        atexit.register(self.close)
1✔
46

47
    def _normalize_drop_keepends(self, drop, keepends, drop_default):
1✔
48
        '''
49
        >>> t = tube()
50
        >>> t._normalize_drop_keepends(None, None, True)
51
        True
52
        >>> t._normalize_drop_keepends(None, None, False)
53
        False
54
        >>> t._normalize_drop_keepends(True, None, True)
55
        True
56
        >>> t._normalize_drop_keepends(True, None, False)
57
        True
58
        >>> t._normalize_drop_keepends(None, True, True)
59
        False
60
        >>> t._normalize_drop_keepends(None, True, False)
61
        False
62
        >>> t._normalize_drop_keepends(False, None, True)
63
        False
64
        >>> t._normalize_drop_keepends(False, None, False)
65
        False
66
        >>> t._normalize_drop_keepends(None, False, True)
67
        True
68
        >>> t._normalize_drop_keepends(None, False, False)
69
        True
70
        >>> t._normalize_drop_keepends(True, False, False)
71
        Traceback (most recent call last):
72
            ...
73
        pwnlib.exception.PwnlibException: 'drop' and 'keepends' arguments cannot be used together.
74
        '''
75
        if keepends is not None:
1✔
76
            self.warn_once("'keepends' argument is deprecated. Use 'drop' instead.")
1✔
77
        if drop is None and keepends is None:
1✔
78
            return drop_default
1✔
79
        elif drop is not None:
1✔
80
            if keepends is not None:
1✔
81
                self.error("'drop' and 'keepends' arguments cannot be used together.")
1✔
82
            return drop
1✔
83
        return not keepends
1✔
84

85
    @property
1✔
86
    def newline(self):
1✔
87
        r'''Character sent with methods like sendline() or used for recvline().
88

89
            >>> t = tube()
90
            >>> t.newline = b'X'
91
            >>> t.unrecv(b'A\nB\nCX')
92
            >>> t.recvline()
93
            b'A\nB\nCX'
94

95
            >>> t = tube()
96
            >>> context.newline = b'\r\n'
97
            >>> t.newline
98
            b'\r\n'
99

100
            # Clean up
101
            >>> context.clear()
102
        '''
103
        if self._newline is not None:
1✔
104
            return self._newline
1✔
105
        return context.newline
1✔
106

107
    @newline.setter
1✔
108
    def newline(self, newline):
1✔
109
        self._newline = packing._need_bytes(newline)
1✔
110

111
    # Functions based on functions from subclasses
112
    def recv(self, numb = None, timeout = default):
1✔
113
        r"""recv(numb = 4096, timeout = default) -> bytes
114

115
        Receives up to `numb` bytes of data from the tube, and returns
116
        as soon as any quantity of data is available.
117

118
        If the request is not satisfied before ``timeout`` seconds pass,
119
        all data is buffered and an empty string (``''``) is returned.
120

121
        Raises:
122
            exceptions.EOFError: The connection is closed
123

124
        Returns:
125
            A bytes object containing bytes received from the socket,
126
            or ``''`` if a timeout occurred while waiting.
127

128
        Examples:
129

130
            >>> t = tube()
131
            >>> # Fake a data source
132
            >>> t.recv_raw = lambda n: b'Hello, world'
133
            >>> t.recv() == b'Hello, world'
134
            True
135
            >>> t.unrecv(b'Woohoo')
136
            >>> t.recv() == b'Woohoo'
137
            True
138
            >>> with context.local(log_level='debug'):
139
            ...    _ = t.recv()
140
            [...] Received 0xc bytes:
141
                b'Hello, world'
142
        """
143
        numb = self.buffer.get_fill_size(numb)
1✔
144
        return self._recv(numb, timeout) or b''
1✔
145

146
    def unrecv(self, data):
1✔
147
        """unrecv(data)
148

149
        Puts the specified data back at the beginning of the receive
150
        buffer.
151

152
        Examples:
153

154
            >>> t = tube()
155
            >>> t.recv_raw = lambda n: b'hello'
156
            >>> t.recv()
157
            b'hello'
158
            >>> t.recv()
159
            b'hello'
160
            >>> t.unrecv(b'world')
161
            >>> t.recv()
162
            b'world'
163
            >>> t.recv()
164
            b'hello'
165
        """
166
        data = packing._need_bytes(data)
1✔
167
        self.buffer.unget(data)
1✔
168

169
    def _fillbuffer(self, timeout = default):
1✔
170
        """_fillbuffer(timeout = default)
171

172
        Fills the internal buffer from the pipe, by calling
173
        :meth:`recv_raw` exactly once.
174

175
        Returns:
176

177
            The bytes of data received, or ``''`` if no data was received.
178

179
        Examples:
180

181
            >>> t = tube()
182
            >>> t.recv_raw = lambda *a: b'abc'
183
            >>> len(t.buffer)
184
            0
185
            >>> t._fillbuffer()
186
            b'abc'
187
            >>> len(t.buffer)
188
            3
189
        """
190
        data = b''
1✔
191

192
        with self.local(timeout):
1✔
193
            data = self.recv_raw(self.buffer.get_fill_size())
1✔
194

195
        if data and self.isEnabledFor(logging.DEBUG):
1✔
196
            self.debug('Received %#x bytes:' % len(data))
1✔
197
            self.maybe_hexdump(data, level=logging.DEBUG)
1✔
198
        if data:
1✔
199
            self.buffer.add(data)
1✔
200

201
        return data
1✔
202

203

204
    def _recv(self, numb = None, timeout = default):
1✔
205
        """_recv(numb = 4096, timeout = default) -> str
206

207
        Receives one chunk of from the internal buffer or from the OS if the
208
        buffer is empty.
209
        """
210
        numb = self.buffer.get_fill_size(numb)
1✔
211

212
        # No buffered data, could not put anything in the buffer
213
        # before timeout.
214
        if not self.buffer and not self._fillbuffer(timeout):
1✔
215
            return b''
1✔
216

217
        return self.buffer.get(numb)
1✔
218

219
    def recvpred(self, pred, timeout = default):
1✔
220
        """recvpred(pred, timeout = default) -> bytes
221

222
        Receives one byte at a time from the tube, until ``pred(all_bytes)``
223
        evaluates to True.
224

225
        If the request is not satisfied before ``timeout`` seconds pass,
226
        all data is buffered and an empty string (``''``) is returned.
227

228
        Arguments:
229
            pred(callable): Function to call, with the currently-accumulated data.
230
            timeout(int): Timeout for the operation
231

232
        Raises:
233
            exceptions.EOFError: The connection is closed
234

235
        Returns:
236
            A bytes object containing bytes received from the socket,
237
            or ``''`` if a timeout occurred while waiting.
238

239
        Examples:
240

241
            >>> t = tube()
242
            >>> t.recv_raw = lambda n: b'abbbaccc'
243
            >>> pred = lambda p: p.count(b'a') == 2
244
            >>> t.recvpred(pred)
245
            b'abbba'
246
            >>> pred = lambda p: p.count(b'd') > 0
247
            >>> t.recvpred(pred, timeout=0.05)
248
            b''
249
        """
250

251
        data = b''
1✔
252

253
        with self.countdown(timeout):
1✔
254
            while not pred(data):
1✔
255
                if not self.countdown_active():
1✔
256
                    self.unrecv(data)
1✔
257
                    return b''
1✔
258

259
                try:
1✔
260
                    res = self.recv(1, timeout=timeout)
1✔
261
                except Exception:
×
262
                    self.unrecv(data)
×
263
                    return b''
×
264

265
                if res:
1!
266
                    data += res
1✔
267
                else:
268
                    self.unrecv(data)
×
269
                    return b''
×
270

271
        return data
1✔
272

273
    def recvn(self, numb, timeout = default):
1✔
274
        """recvn(numb, timeout = default) -> bytes
275

276
        Receives exactly `n` bytes.
277

278
        If the request is not satisfied before ``timeout`` seconds pass,
279
        all data is buffered and an empty string (``''``) is returned.
280

281
        Raises:
282
            exceptions.EOFError: The connection closed before the request could be satisfied
283

284
        Returns:
285
            A string containing bytes received from the socket,
286
            or ``''`` if a timeout occurred while waiting.
287

288
        Examples:
289

290
            >>> t = tube()
291
            >>> data = b'hello world'
292
            >>> t.recv_raw = lambda *a: data
293
            >>> t.recvn(len(data)) == data
294
            True
295
            >>> t.recvn(len(data)+1) == data + data[:1]
296
            True
297
            >>> t.recv_raw = lambda *a: None
298
            >>> # The remaining data is buffered
299
            >>> t.recv() == data[1:]
300
            True
301
            >>> t.recv_raw = lambda *a: time.sleep(0.01) or b'a'
302
            >>> t.recvn(10, timeout=0.05)
303
            b''
304
            >>> t.recvn(10, timeout=0.06)
305
            b'aaaaaa...'
306
        """
307
        # Keep track of how much data has been received
308
        # It will be pasted together at the end if a
309
        # timeout does not occur, or put into the tube buffer.
310
        with self.countdown(timeout):
1✔
311
            while self.countdown_active() and len(self.buffer) < numb and self._fillbuffer(self.timeout):
1✔
312
                pass
1✔
313

314
        if len(self.buffer) < numb:
1✔
315
            return b''
1✔
316

317
        return self.buffer.get(numb)
1✔
318

319
    def recvuntil(self, delims, drop=False, timeout=default):
1✔
320
        """recvuntil(delims, drop=False, timeout=default) -> bytes
321

322
        Receive data until one of `delims` is encountered.
323

324
        If the request is not satisfied before ``timeout`` seconds pass,
325
        all data is buffered and an empty string (``''``) is returned.
326

327
        arguments:
328
            delims(bytes,tuple): Byte-string of delimiters characters, or list of delimiter byte-strings.
329
            drop(bool): Drop the ending.  If :const:`True` it is removed from the end of the return value.
330

331
        Raises:
332
            exceptions.EOFError: The connection closed before the request could be satisfied
333

334
        Returns:
335
            A string containing bytes received from the socket,
336
            or ``''`` if a timeout occurred while waiting.
337

338
        Examples:
339

340
            >>> t = tube()
341
            >>> t.recv_raw = lambda n: b"Hello World!"
342
            >>> t.recvuntil(b' ')
343
            b'Hello '
344
            >>> _=t.clean(0)
345
            >>> # Matches on 'o' in 'Hello'
346
            >>> t.recvuntil((b' ',b'W',b'o',b'r'))
347
            b'Hello'
348
            >>> _=t.clean(0)
349
            >>> # Matches expressly full string
350
            >>> t.recvuntil(b' Wor')
351
            b'Hello Wor'
352
            >>> _=t.clean(0)
353
            >>> # Matches on full string, drops match
354
            >>> t.recvuntil(b' Wor', drop=True)
355
            b'Hello'
356

357
            >>> # Try with regex special characters
358
            >>> t = tube()
359
            >>> t.recv_raw = lambda n: b"Hello|World"
360
            >>> t.recvuntil(b'|', drop=True)
361
            b'Hello'
362

363
        """
364
        # Convert string into singleton tupple
365
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
366
            delims = (delims,)
1✔
367
        delims = tuple(map(packing._need_bytes, delims))
1✔
368

369
        # Longest delimiter for tracking purposes
370
        longest = max(map(len, delims))
1✔
371

372
        # Cumulative data to search
373
        data = []
1✔
374
        top = b''
1✔
375

376
        with self.countdown(timeout):
1!
377
            while self.countdown_active():
1!
378
                try:
1✔
379
                    res = self.recv(timeout=self.timeout)
1✔
380
                except Exception:
1✔
381
                    self.unrecv(b''.join(data) + top)
1✔
382
                    raise
1✔
383

384
                if not res:
1✔
385
                    self.unrecv(b''.join(data) + top)
1✔
386
                    return b''
1✔
387

388
                top += res
1✔
389
                start = len(top)
1✔
390
                for d in delims:
1✔
391
                    j = top.find(d)
1✔
392
                    if start > j > -1:
1✔
393
                        start = j
1✔
394
                        end = j + len(d)
1✔
395
                if start < len(top):
1✔
396
                    self.unrecv(top[end:])
1✔
397
                    if drop:
1✔
398
                        top = top[:start]
1✔
399
                    else:
400
                        top = top[:end]
1✔
401
                    return b''.join(data) + top
1✔
402
                if len(top) > longest:
1✔
403
                    i = -longest - 1
1✔
404
                    data.append(top[:i])
1✔
405
                    top = top[i:]
1✔
406

407
        return b''
×
408

409
    def recvlines(self, numlines=2**20, drop=None, keepends=None, timeout=default):
1✔
410
        r"""recvlines(numlines, drop=True, timeout=default) -> list of bytes objects
411

412
        Receive up to ``numlines`` lines.
413

414
        A "line" is any sequence of bytes terminated by the byte sequence
415
        set by :attr:`newline`, which defaults to ``'\n'``.
416

417
        If the request is not satisfied before ``timeout`` seconds pass,
418
        all data is buffered and an empty string (``''``) is returned.
419

420
        Arguments:
421
            numlines(int): Maximum number of lines to receive
422
            drop(bool): Drop newlines at the end of each line (:const:`True`).
423
            timeout(int): Maximum timeout
424

425
        Raises:
426
            exceptions.EOFError: The connection closed before the request could be satisfied
427

428
        Returns:
429
            A string containing bytes received from the socket,
430
            or ``''`` if a timeout occurred while waiting.
431

432
        Examples:
433

434
            >>> t = tube()
435
            >>> t.recv_raw = lambda n: b'\n'
436
            >>> t.recvlines(3)
437
            [b'', b'', b'']
438
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
439
            >>> t.recvlines(3)
440
            [b'Foo', b'Bar', b'Baz']
441
            >>> t.recvlines(3, drop=False)
442
            [b'Foo\n', b'Bar\n', b'Baz\n']
443
        """
444
        drop = self._normalize_drop_keepends(drop, keepends, True)
1✔
445
        del keepends
1✔
446

447
        lines = []
1✔
448
        with self.countdown(timeout):
1✔
449
            for _ in range(numlines):
1✔
450
                try:
1✔
451
                    # We must set 'drop' to False here so that we can
452
                    # restore the original, unmodified data to the buffer
453
                    # in the event of a timeout.
454
                    res = self.recvline(drop=False, timeout=timeout)
1✔
NEW
455
                except Exception:
×
UNCOV
456
                    self.unrecv(b''.join(lines))
×
457
                    raise
×
458

459
                if res:
1!
460
                    lines.append(res)
1✔
461
                else:
UNCOV
462
                    break
×
463

464
        if drop:
1✔
465
            lines = [line[:-len(self.newline)] for line in lines]
1✔
466

467
        return lines
1✔
468

469
    def recvlinesS(self, numlines=2**20, drop=None, keepends=None, timeout=default):
1✔
470
        r"""recvlinesS(numlines, drop=True, timeout=default) -> str list
471

472
        This function is identical to :meth:`recvlines`, but decodes
473
        the received bytes into string using :func:`context.encoding`.
474
        You should use :meth:`recvlines` whenever possible for better performance.
475

476
        Examples:
477

478
            >>> t = tube()
479
            >>> t.recv_raw = lambda n: b'\n'
480
            >>> t.recvlinesS(3)
481
            ['', '', '']
482
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
483
            >>> t.recvlinesS(3)
484
            ['Foo', 'Bar', 'Baz']
485
        """
486
        return [packing._decode(x) for x in self.recvlines(numlines, drop=drop, keepends=keepends, timeout=timeout)]
1✔
487

488
    def recvlinesb(self, numlines=2**20, drop=None, keepends=None, timeout=default):
1✔
489
        r"""recvlinesb(numlines, drop=True, timeout=default) -> bytearray list
490

491
        This function is identical to :meth:`recvlines`, but returns a bytearray.
492

493
        Examples:
494

495
            >>> t = tube()
496
            >>> t.recv_raw = lambda n: b'\n'
497
            >>> t.recvlinesb(3)
498
            [bytearray(b''), bytearray(b''), bytearray(b'')]
499
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
500
            >>> t.recvlinesb(3)
501
            [bytearray(b'Foo'), bytearray(b'Bar'), bytearray(b'Baz')]
502
        """
503
        return [bytearray(x) for x in self.recvlines(numlines, drop=drop, keepends=keepends, timeout=timeout)]
1✔
504

505
    def recvline(self, drop=None, keepends=None, timeout=default):
1✔
506
        r"""recvline(drop=False, timeout=default) -> bytes
507

508
        Receive a single line from the tube.
509

510
        A "line" is any sequence of bytes terminated by the byte sequence
511
        set in :attr:`newline`, which defaults to ``b'\n'``.
512

513
        If the connection is closed (:class:`EOFError`) before a newline
514
        is received, the buffered data is returned by default and a warning
515
        is logged. If the buffer is empty, an :class:`EOFError` is raised.
516
        This behavior can be changed by setting :meth:`pwnlib.context.ContextType.throw_eof_on_incomplete_line`.
517

518
        If the request is not satisfied before ``timeout`` seconds pass,
519
        all data is buffered and an empty byte string (``b''``) is returned.
520

521
        Arguments:
522
            drop(bool): Drop the line ending (:const:`False`).
523
            timeout(int): Timeout
524

525
        Raises:
526
            :class:`EOFError`: The connection closed before the request
527
                                 could be satisfied and the buffer is empty
528

529
        Return:
530
            All bytes received over the tube until the first
531
            newline ``'\n'`` is received.  Optionally retains
532
            the ending. If the connection is closed before a newline
533
            is received, the remaining data received up to this point
534
            is returned.
535

536

537
        Examples:
538

539
            >>> t = tube()
540
            >>> t.recv_raw = lambda n: b'Foo\nBar\r\nBaz\n'
541
            >>> t.recvline()
542
            b'Foo\n'
543
            >>> t.recvline()
544
            b'Bar\r\n'
545
            >>> t.recvline(drop=True)
546
            b'Baz'
547
            >>> t.newline = b'\r\n'
548
            >>> t.recvline(drop=True)
549
            b'Foo\nBar'
550
            >>> t = tube()
551
            >>> def _recv_eof(n):
552
            ...     if not _recv_eof.throw:
553
            ...         _recv_eof.throw = True
554
            ...         return b'real line\ntrailing data'
555
            ...     raise EOFError
556
            >>> _recv_eof.throw = False
557
            >>> t.recv_raw = _recv_eof
558
            >>> t.recvline()
559
            b'real line\n'
560
            >>> t.recvline()
561
            b'trailing data'
562
            >>> t.recvline()
563
            Traceback (most recent call last):
564
                ...
565
            EOFError
566
        """
567
        drop = self._normalize_drop_keepends(drop, keepends, False)
1✔
568
        del keepends
1✔
569

570
        try:
1✔
571
            return self.recvuntil(self.newline, drop=drop, timeout=timeout)
1✔
572
        except EOFError:
1✔
573
            if not context.throw_eof_on_incomplete_line and self.buffer.size > 0:
1✔
574
                if context.throw_eof_on_incomplete_line is None:
1!
575
                    self.warn_once('EOFError during recvline. Returning buffered data without trailing newline.')
1✔
576
                return self.buffer.get()
1✔
577
            raise
1✔
578

579
    def recvline_pred(self, pred, drop=None, keepends=None, timeout=default):
1✔
580
        r"""recvline_pred(pred, drop=True, timeout=default) -> bytes
581

582
        Receive data until ``pred(line)`` returns a truthy value.
583
        Drop all other data.
584

585
        If the request is not satisfied before ``timeout`` seconds pass,
586
        all data is buffered and an empty string (``''``) is returned.
587

588
        Arguments:
589
            pred(callable): Function to call.  Returns the line for which
590
                this function returns :const:`True`.
591
            drop(bool): Drop the line ending (:const:`True`).
592

593
        Examples:
594

595
            >>> t = tube()
596
            >>> t.recv_raw = lambda n: b"Foo\nBar\nBaz\n"
597
            >>> t.recvline_pred(lambda line: line == b"Bar\n")
598
            b'Bar'
599
            >>> t.recvline_pred(lambda line: line == b"Bar\n", drop=False)
600
            b'Bar\n'
601
            >>> t.recvline_pred(lambda line: line == b'Nope!', timeout=0.1)
602
            b''
603
        """
604
        drop = self._normalize_drop_keepends(drop, keepends, True)
1✔
605
        del keepends
1✔
606

607
        tmpbuf = Buffer()
1✔
608
        line   = b''
1✔
609
        with self.countdown(timeout):
1✔
610
            while self.countdown_active():
1✔
611
                try:
1✔
612
                    line = self.recvline(drop=False)
1✔
UNCOV
613
                except Exception:
×
UNCOV
614
                    self.buffer.unget(tmpbuf)
×
UNCOV
615
                    raise
×
616

617
                if not line:
1!
NEW
618
                    self.buffer.unget(tmpbuf)
×
619
                    return b''
×
620

621
                if pred(line):
1✔
622
                    if drop:
1✔
623
                        line = line[:-len(self.newline)]
1✔
624
                    return line
1✔
625
                else:
626
                    tmpbuf.add(line)
1✔
627

628
        return b''
1✔
629

630
    def recvline_contains(self, items, drop=None, keepends=None, timeout=default):
1✔
631
        r"""recvline_contains(items, drop=True, timeout=default) -> bytes
632

633
        Receive lines until one line is found which contains at least
634
        one of `items`.
635

636
        Arguments:
637
            items(str,tuple): List of strings to search for, or a single string.
638
            drop(bool): Drop the line ending (:const:`True`).
639
            timeout(int): Timeout, in seconds
640

641
        Examples:
642

643
            >>> t = tube()
644
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
645
            >>> t.recvline_contains(b'r')
646
            b'World'
647
            >>> f = lambda n: b"cat dog bird\napple pear orange\nbicycle car train\n"
648
            >>> t = tube()
649
            >>> t.recv_raw = f
650
            >>> t.recvline_contains(b'pear')
651
            b'apple pear orange'
652
            >>> t = tube()
653
            >>> t.recv_raw = f
654
            >>> t.recvline_contains((b'car', b'train'))
655
            b'bicycle car train'
656
        """
657
        if isinstance(items, (bytes, bytearray, six.text_type)):
1✔
658
            items = (items,)
1✔
659
        items = tuple(map(packing._need_bytes, items))
1✔
660

661
        def pred(line):
1✔
662
            return any(d in line for d in items)
1✔
663

664
        return self.recvline_pred(pred, drop=drop, keepends=keepends, timeout=timeout)
1✔
665

666
    def recvline_startswith(self, delims, drop=None, keepends=None, timeout=default):
1✔
667
        r"""recvline_startswith(delims, drop=True, timeout=default) -> bytes
668

669
        Keep receiving lines until one is found that starts with one of
670
        `delims`.  Returns the last line received.
671

672
        If the request is not satisfied before ``timeout`` seconds pass,
673
        all data is buffered and an empty string (``''``) is returned.
674

675
        Arguments:
676
            delims(str,tuple): List of strings to search for, or string of single characters
677
            drop(bool): Drop the line ending (:const:`True`).
678
            timeout(int): Timeout, in seconds
679

680
        Returns:
681
            The first line received which starts with a delimiter in ``delims``.
682

683
        Examples:
684

685
            >>> t = tube()
686
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
687
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'))
688
            b'World'
689
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), drop=False)
690
            b'Xylophone\n'
691
            >>> t.recvline_startswith(b'Wo')
692
            b'World'
693
        """
694
        # Convert string into singleton tupple
695
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
696
            delims = (delims,)
1✔
697
        delims = tuple(map(packing._need_bytes, delims))
1✔
698

699
        return self.recvline_pred(lambda line: any(map(line.startswith, delims)),
1✔
700
                                  drop=drop,
701
                                  keepends=keepends,
702
                                  timeout=timeout)
703

704
    def recvline_endswith(self, delims, drop=None, keepends=None, timeout=default):
1✔
705
        r"""recvline_endswith(delims, drop=True, timeout=default) -> bytes
706

707
        Keep receiving lines until one is found that ends with one of
708
        `delims`.  Returns the last line received.
709

710
        If the request is not satisfied before ``timeout`` seconds pass,
711
        all data is buffered and an empty string (``''``) is returned.
712

713
        See :meth:`recvline_startswith` for more details.
714

715
        Examples:
716

717
            >>> t = tube()
718
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\nKaboodle\n'
719
            >>> t.recvline_endswith(b'r')
720
            b'Bar'
721
            >>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), drop=False)
722
            b'Kaboodle\n'
723
            >>> t.recvline_endswith(b'oodle')
724
            b'Kaboodle'
725
        """
726
        # Convert string into singleton tupple
727
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
728
            delims = (delims,)
1✔
729

730
        delims = tuple(packing._need_bytes(delim) + self.newline for delim in delims)
1✔
731

732
        return self.recvline_pred(lambda line: any(map(line.endswith, delims)),
1✔
733
                                  drop=drop,
734
                                  keepends=keepends,
735
                                  timeout=timeout)
736

737
    def recvregex(self, regex, exact=False, timeout=default, capture=False):
1✔
738
        r"""recvregex(regex, exact=False, timeout=default, capture=False) -> bytes
739

740
        Wrapper around :func:`recvpred`, which will return when a regex
741
        matches the string in the buffer.
742

743
        Returns all received data up until the regex matched. If `capture` is
744
        set to True, a :class:`re.Match` object is returned instead.
745

746
        By default :func:`re.RegexObject.search` is used, but if `exact` is
747
        set to True, then :func:`re.RegexObject.match` will be used instead.
748

749
        If the request is not satisfied before ``timeout`` seconds pass,
750
        all data is buffered and an empty string (``''``) is returned.
751

752
        Examples:
753

754
            >>> t = tube()
755
            >>> t.recv_raw = lambda n: b'The lucky number is 1337 as always\nBla blubb blargh\n'
756
            >>> m = t.recvregex(br'number is ([0-9]+) as always\n', capture=True)
757
            >>> m.group(1)
758
            b'1337'
759
            >>> t.recvregex(br'Bla .* blargh\n')
760
            b'Bla blubb blargh\n'
761
        """
762

763
        if isinstance(regex, (bytes, bytearray, six.text_type)):
1!
764
            regex = packing._need_bytes(regex)
1✔
765
            regex = re.compile(regex)
1✔
766

767
        if exact:
1!
UNCOV
768
            pred = regex.match
×
769
        else:
770
            pred = regex.search
1✔
771

772
        if capture:
1✔
773
            return pred(self.recvpred(pred, timeout = timeout))
1✔
774
        else:
775
            return self.recvpred(pred, timeout = timeout)
1✔
776

777
    def recvline_regex(self, regex, exact=False, drop=None, keepends=None, timeout=default):
1✔
778
        """recvline_regex(regex, exact=False, drop=True, timeout=default) -> bytes
779

780
        Wrapper around :func:`recvline_pred`, which will return when a regex
781
        matches a line.
782

783
        By default :func:`re.RegexObject.search` is used, but if `exact` is
784
        set to True, then :func:`re.RegexObject.match` will be used instead.
785

786
        If the request is not satisfied before ``timeout`` seconds pass,
787
        all data is buffered and an empty string (``''``) is returned.
788
        """
789

790
        if isinstance(regex, (bytes, bytearray, six.text_type)):
1!
791
            regex = packing._need_bytes(regex)
1✔
792
            regex = re.compile(regex)
1✔
793

794
        if exact:
1!
UNCOV
795
            pred = regex.match
×
796
        else:
797
            pred = regex.search
1✔
798

799
        return self.recvline_pred(pred, drop=drop, keepends=keepends, timeout=timeout)
1✔
800

801
    def recvrepeat(self, timeout=default):
1✔
802
        """recvrepeat(timeout=default) -> bytes
803

804
        Receives data until a timeout or EOF is reached.
805

806
        Examples:
807

808
            >>> data = [
809
            ... b'd',
810
            ... b'', # simulate timeout
811
            ... b'c',
812
            ... b'b',
813
            ... b'a',
814
            ... ]
815
            >>> def delayrecv(n, data=data):
816
            ...     return data.pop()
817
            >>> t = tube()
818
            >>> t.recv_raw = delayrecv
819
            >>> t.recvrepeat(0.2)
820
            b'abc'
821
            >>> t.recv()
822
            b'd'
823
        """
824

825
        try:
1✔
826
            while self._fillbuffer(timeout=timeout):
1✔
827
                pass
1✔
UNCOV
828
        except EOFError:
×
UNCOV
829
            pass
×
830

831
        return self.buffer.get()
1✔
832

833
    def recvall(self, timeout=Timeout.forever):
1✔
834
        """recvall(timeout=Timeout.forever) -> bytes
835

836
        Receives data until EOF is reached and closes the tube.
837
        """
838

839
        with self.waitfor('Receiving all data') as h:
1✔
840
            l = len(self.buffer)
1✔
841
            with self.local(timeout):
1✔
842
                try:
1✔
843
                    while True:
1✔
844
                        l = misc.size(len(self.buffer))
1✔
845
                        h.status(l)
1✔
846
                        if not self._fillbuffer():
1✔
847
                            break
1✔
848
                except EOFError:
1✔
849
                    pass
1✔
850
            h.success("Done (%s)" % l)
1✔
851
        self.close()
1✔
852

853
        return self.buffer.get()
1✔
854

855
    def send(self, data):
1✔
856
        """send(data)
857

858
        Sends data.
859

860
        If log level ``DEBUG`` is enabled, also prints out the data
861
        received.
862

863
        If it is not possible to send anymore because of a closed
864
        connection, it raises ``exceptions.EOFError``
865

866
        Examples:
867

868
            >>> def p(x): print(repr(x))
869
            >>> t = tube()
870
            >>> t.send_raw = p
871
            >>> t.send(b'hello')
872
            b'hello'
873
        """
874

875
        data = packing._need_bytes(data)
1✔
876

877
        if self.isEnabledFor(logging.DEBUG):
1!
UNCOV
878
            self.debug('Sent %#x bytes:' % len(data))
×
UNCOV
879
            self.maybe_hexdump(data, level=logging.DEBUG)
×
880

881
        self.send_raw(data)
1✔
882

883
    def sendline(self, line=b''):
1✔
884
        r"""sendline(data)
885

886
        Shorthand for ``t.send(data + t.newline)``.
887

888
        Examples:
889

890
            >>> def p(x): print(repr(x))
891
            >>> t = tube()
892
            >>> t.send_raw = p
893
            >>> t.sendline(b'hello')
894
            b'hello\n'
895
            >>> t.newline = b'\r\n'
896
            >>> t.sendline(b'hello')
897
            b'hello\r\n'
898
        """
899

900
        line = packing._need_bytes(line)
1✔
901

902
        self.send(line + self.newline)
1✔
903

904
    def sendlines(self, lines=[]):
1✔
UNCOV
905
        for line in lines:
×
UNCOV
906
            line = packing._need_bytes(line)
×
UNCOV
907
            self.sendline(line)
×
908

909
    def sendafter(self, delim, data, timeout = default):
1✔
910
        """sendafter(delim, data, timeout = default) -> str
911

912
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``send(data)``.
913
        """
914

UNCOV
915
        data = packing._need_bytes(data)
×
UNCOV
916
        res = self.recvuntil(delim, timeout=timeout)
×
UNCOV
917
        self.send(data)
×
UNCOV
918
        return res
×
919

920
    def sendlineafter(self, delim, data, timeout = default):
1✔
921
        """sendlineafter(delim, data, timeout = default) -> str
922

923
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``sendline(data)``."""
924

UNCOV
925
        data = packing._need_bytes(data)
×
UNCOV
926
        res = self.recvuntil(delim, timeout=timeout)
×
UNCOV
927
        self.sendline(data)
×
UNCOV
928
        return res
×
929

930
    def sendthen(self, delim, data, timeout = default):
1✔
931
        """sendthen(delim, data, timeout = default) -> str
932

933
        A combination of ``send(data)`` and ``recvuntil(delim, timeout=timeout)``."""
934

UNCOV
935
        data = packing._need_bytes(data)
×
UNCOV
936
        self.send(data)
×
UNCOV
937
        return self.recvuntil(delim, timeout=timeout)
×
938

939
    def sendlinethen(self, delim, data, timeout = default):
1✔
940
        """sendlinethen(delim, data, timeout = default) -> str
941

942
        A combination of ``sendline(data)`` and ``recvuntil(delim, timeout=timeout)``."""
943

UNCOV
944
        data = packing._need_bytes(data)
×
UNCOV
945
        self.sendline(data)
×
UNCOV
946
        return self.recvuntil(delim, timeout=timeout)
×
947

948
    def interactive(self, prompt = term.text.bold_red('$') + ' '):
1✔
949
        """interactive(prompt = pwnlib.term.text.bold_red('$') + ' ')
950

951
        Does simultaneous reading and writing to the tube. In principle this just
952
        connects the tube to standard in and standard out, but in practice this
953
        is much more usable, since we are using :mod:`pwnlib.term` to print a
954
        floating prompt.
955

956
        Thus it only works while in :data:`pwnlib.term.term_mode`.
957
        """
958

959
        self.info('Switching to interactive mode')
1✔
960

961
        go = threading.Event()
1✔
962
        def recv_thread():
1✔
963
            while not go.is_set():
1✔
964
                try:
1✔
965
                    cur = self.recv(timeout = 0.05)
1✔
966
                    cur = cur.replace(self.newline, b'\n')
1✔
967
                    if cur:
1✔
968
                        stdout = sys.stdout
1✔
969
                        if not term.term_mode:
1!
970
                            stdout = getattr(stdout, 'buffer', stdout)
1✔
971
                        stdout.write(cur)
1✔
972
                        stdout.flush()
1✔
UNCOV
973
                except EOFError:
×
UNCOV
974
                    self.info('Got EOF while reading in interactive')
×
UNCOV
975
                    break
×
976

977
        t = context.Thread(target = recv_thread)
1✔
978
        t.daemon = True
1✔
979
        t.start()
1✔
980

981
        from pwnlib.args import term_mode
1✔
982
        try:
1✔
983
            os_linesep = os.linesep.encode()
1✔
984
            to_skip = b''
1✔
985
            while not go.is_set():
1✔
986
                if term.term_mode:
1!
UNCOV
987
                    data = term.readline.readline(prompt = prompt, float = True)
×
UNCOV
988
                    if data.endswith(b'\n') and self.newline != b'\n':
×
UNCOV
989
                        data = data[:-1] + self.newline
×
990
                else:
991
                    stdin = getattr(sys.stdin, 'buffer', sys.stdin)
1✔
992
                    data = stdin.read(1)
1✔
993
                    # Keep OS's line separator if NOTERM is set and
994
                    # the user did not specify a custom newline
995
                    # even if stdin is a tty.
996
                    if sys.stdin.isatty() and (
1!
997
                        term_mode
998
                        or context.newline != b"\n"
999
                        or self._newline is not None
1000
                    ):
UNCOV
1001
                        if to_skip:
×
UNCOV
1002
                            if to_skip[:1] != data:
×
UNCOV
1003
                                data = os_linesep[: -len(to_skip)] + data
×
1004
                            else:
1005
                                to_skip = to_skip[1:]
×
1006
                                if to_skip:
×
1007
                                    continue
×
UNCOV
1008
                                data = self.newline
×
1009
                        # If we observe a prefix of the line separator in a tty,
1010
                        # assume we'll see the rest of it immediately after.
1011
                        # This could stall until the next character is seen if
1012
                        # the line separator is started but never finished, but
1013
                        # that is unlikely to happen in a dynamic tty.
UNCOV
1014
                        elif data and os_linesep.startswith(data):
×
UNCOV
1015
                            if len(os_linesep) > 1:
×
UNCOV
1016
                                to_skip = os_linesep[1:]
×
UNCOV
1017
                                continue
×
1018
                            data = self.newline
×
1019

1020
                if data:
1!
1021
                    try:
×
1022
                        self.send(data)
×
UNCOV
1023
                    except EOFError:
×
UNCOV
1024
                        go.set()
×
1025
                        self.info('Got EOF while sending in interactive')
×
1026
                else:
1027
                    go.set()
1✔
1028
        except KeyboardInterrupt:
×
1029
            self.info('Interrupted')
×
UNCOV
1030
            go.set()
×
1031

1032
        while t.is_alive():
1✔
1033
            t.join(timeout = 0.1)
1✔
1034

1035
    def stream(self, line_mode=True):
1✔
1036
        """stream()
1037

1038
        Receive data until the tube exits, and print it to stdout.
1039

1040
        Similar to :func:`interactive`, except that no input is sent.
1041

1042
        Similar to ``print(tube.recvall())`` except that data is printed
1043
        as it is received, rather than after all data is received.
1044

1045
        Arguments:
1046
            line_mode(bool): Whether to receive line-by-line or raw data.
1047

1048
        Returns:
1049
            All data printed.
1050
        """
UNCOV
1051
        buf = Buffer()
×
UNCOV
1052
        function = self.recvline if line_mode else self.recv
×
UNCOV
1053
        try:
×
UNCOV
1054
            while True:
×
1055
                buf.add(function())
×
1056
                stdout = sys.stdout
×
1057
                if not term.term_mode:
×
1058
                    stdout = getattr(stdout, 'buffer', stdout)
×
1059
                stdout.write(buf.data[-1])
×
1060
        except KeyboardInterrupt:
×
1061
            pass
×
1062
        except EOFError:
×
1063
            pass
×
1064

1065
        return buf.get()
×
1066

1067
    def clean(self, timeout = 0.05):
1✔
1068
        """clean(timeout = 0.05)
1069

1070
        Removes all the buffered data from a tube by calling
1071
        :meth:`pwnlib.tubes.tube.tube.recv` with a low timeout until it fails.
1072

1073
        If ``timeout`` is zero, only cached data will be cleared.
1074

1075
        Note: If timeout is set to zero, the underlying network is
1076
        not actually polled; only the internal buffer is cleared.
1077

1078
        Returns:
1079

1080
            All data received
1081

1082
        Examples:
1083

1084
            >>> t = tube()
1085
            >>> t.unrecv(b'clean me up')
1086
            >>> t.clean(0)
1087
            b'clean me up'
1088
            >>> len(t.buffer)
1089
            0
1090
        """
1091
        if timeout == 0:
1✔
1092
            return self.buffer.get()
1✔
1093

1094
        return self.recvrepeat(timeout)
1✔
1095

1096
    def clean_and_log(self, timeout = 0.05):
1✔
1097
        r"""clean_and_log(timeout = 0.05)
1098

1099
        Works exactly as :meth:`pwnlib.tubes.tube.tube.clean`, but logs received
1100
        data with :meth:`pwnlib.self.info`.
1101

1102
        Returns:
1103

1104
            All data received
1105

1106
        Examples:
1107

1108
            >>> def recv(n, data=[b'', b'hooray_data']):
1109
            ...     while data: return data.pop()
1110
            >>> t = tube()
1111
            >>> t.recv_raw      = recv
1112
            >>> t.connected_raw = lambda d: True
1113
            >>> t.fileno        = lambda: 1234
1114
            >>> with context.local(log_level='info'):
1115
            ...     data = t.clean_and_log()
1116
            [...] Received 0xb bytes:
1117
                b'hooray_data'
1118
            >>> data
1119
            b'hooray_data'
1120
            >>> context.clear()
1121
        """
1122
        cached_data = self.buffer.get()
1✔
1123
        if cached_data and not self.isEnabledFor(logging.DEBUG):
1!
UNCOV
1124
            with context.local(log_level='debug'):
×
UNCOV
1125
                self.debug('Received %#x bytes:' % len(cached_data))
×
UNCOV
1126
                self.maybe_hexdump(cached_data, level=logging.DEBUG)
×
1127
        with context.local(log_level='debug'):
1✔
1128
            return cached_data + self.clean(timeout)
1✔
1129

1130
    def connect_input(self, other):
1✔
1131
        """connect_input(other)
1132

1133
        Connects the input of this tube to the output of another tube object.
1134

1135

1136
        Examples:
1137

1138
            >>> def p(x): print(x.decode())
1139
            >>> def recvone(n, data=[b'data']):
1140
            ...     while data: return data.pop()
1141
            ...     raise EOFError
1142
            >>> a = tube()
1143
            >>> b = tube()
1144
            >>> a.recv_raw = recvone
1145
            >>> b.send_raw = p
1146
            >>> a.connected_raw = lambda d: True
1147
            >>> b.connected_raw = lambda d: True
1148
            >>> a.shutdown      = lambda d: True
1149
            >>> b.shutdown      = lambda d: True
1150
            >>> import time
1151
            >>> _=(b.connect_input(a), time.sleep(0.1))
1152
            data
1153
        """
1154

1155
        def pump():
1✔
1156
            import sys as _sys
1✔
1157
            while self.countdown_active():
1!
1158
                if not (self.connected('send') and other.connected('recv')):
1✔
1159
                    break
1✔
1160

1161
                try:
1✔
1162
                    data = other.recv(timeout = 0.05)
1✔
1163
                except EOFError:
1✔
1164
                    break
1✔
1165

1166
                if not _sys:
1!
UNCOV
1167
                    return
×
1168

1169
                if not data:
1✔
1170
                    continue
1✔
1171

1172
                try:
1✔
1173
                    self.send(data)
1✔
UNCOV
1174
                except EOFError:
×
UNCOV
1175
                    break
×
1176

1177
                if not _sys:
1!
1178
                    return
×
1179

1180
            self.shutdown('send')
1✔
1181
            other.shutdown('recv')
1✔
1182

1183
        t = context.Thread(target = pump)
1✔
1184
        t.daemon = True
1✔
1185
        t.start()
1✔
1186

1187
    def connect_output(self, other):
1✔
1188
        """connect_output(other)
1189

1190
        Connects the output of this tube to the input of another tube object.
1191

1192
        Examples:
1193

1194
            >>> def p(x): print(repr(x))
1195
            >>> def recvone(n, data=[b'data']):
1196
            ...     while data: return data.pop()
1197
            ...     raise EOFError
1198
            >>> a = tube()
1199
            >>> b = tube()
1200
            >>> a.recv_raw = recvone
1201
            >>> b.send_raw = p
1202
            >>> a.connected_raw = lambda d: True
1203
            >>> b.connected_raw = lambda d: True
1204
            >>> a.shutdown      = lambda d: True
1205
            >>> b.shutdown      = lambda d: True
1206
            >>> _=(a.connect_output(b), time.sleep(0.1))
1207
            b'data'
1208
        """
1209

1210
        other.connect_input(self)
1✔
1211

1212
    def connect_both(self, other):
1✔
1213
        """connect_both(other)
1214

1215
        Connects the both ends of this tube object with another tube object."""
1216

1217
        self.connect_input(other)
1✔
1218
        self.connect_output(other)
1✔
1219

1220
    def spawn_process(self, *args, **kwargs):
1✔
1221
        """Spawns a new process having this tube as stdin, stdout and stderr.
1222

1223
        Takes the same arguments as :class:`subprocess.Popen`."""
1224

1225
        return subprocess.Popen(
1✔
1226
            *args,
1227
            stdin = self.fileno(),
1228
            stdout = self.fileno(),
1229
            stderr = self.fileno(),
1230
            **kwargs
1231
        )
1232

1233
    def __lshift__(self, other):
1✔
1234
        """
1235
        Shorthand for connecting multiple tubes.
1236

1237
        See :meth:`connect_input` for more information.
1238

1239
        Examples:
1240

1241
            The following are equivalent ::
1242

1243
                tube_a >> tube.b
1244
                tube_a.connect_input(tube_b)
1245

1246
            This is useful when chaining multiple tubes ::
1247

1248
                tube_a >> tube_b >> tube_a
1249
                tube_a.connect_input(tube_b)
1250
                tube_b.connect_input(tube_a)
1251
        """
UNCOV
1252
        self.connect_input(other)
×
UNCOV
1253
        return other
×
1254

1255
    def __rshift__(self, other):
1✔
1256
        """
1257
        Inverse of the ``<<`` operator.  See :meth:`__lshift__`.
1258

1259
        See :meth:`connect_input` for more information.
1260
        """
UNCOV
1261
        self.connect_output(other)
×
UNCOV
1262
        return other
×
1263

1264
    def __ne__(self, other):
1✔
1265
        """
1266
        Shorthand for connecting tubes to eachother.
1267

1268
        The following are equivalent ::
1269

1270
            a >> b >> a
1271
            a <> b
1272

1273
        See :meth:`connect_input` for more information.
1274
        """
UNCOV
1275
        self << other << self
×
1276

1277
    def wait_for_close(self, timeout=default):
1✔
1278
        """Waits until the tube is closed."""
1279

1280
        with self.countdown(timeout):
1✔
1281
            while self.countdown_active():
1!
1282
                if not self.connected():
1✔
1283
                    return
1✔
1284
                time.sleep(min(self.timeout, 0.05))
1✔
1285

1286
    wait = wait_for_close
1✔
1287

1288
    def can_recv(self, timeout = 0):
1✔
1289
        """can_recv(timeout = 0) -> bool
1290

1291
        Returns True, if there is data available within `timeout` seconds.
1292

1293
        Examples:
1294

1295
            >>> import time
1296
            >>> t = tube()
1297
            >>> t.can_recv_raw = lambda *a: False
1298
            >>> t.can_recv()
1299
            False
1300
            >>> _=t.unrecv(b'data')
1301
            >>> t.can_recv()
1302
            True
1303
            >>> _=t.recv()
1304
            >>> t.can_recv()
1305
            False
1306
        """
1307

1308
        return bool(self.buffer or self.can_recv_raw(timeout))
1✔
1309

1310
    def settimeout(self, timeout):
1✔
1311
        """settimeout(timeout)
1312

1313
        Set the timeout for receiving operations. If the string "default"
1314
        is given, then :data:`context.timeout` will be used. If None is given,
1315
        then there will be no timeout.
1316

1317
        Examples:
1318

1319
            >>> t = tube()
1320
            >>> t.settimeout_raw = lambda t: None
1321
            >>> t.settimeout(3)
1322
            >>> t.timeout == 3
1323
            True
1324
        """
1325

1326
        self.timeout = timeout
1✔
1327

1328

1329
    shutdown_directions = {
1✔
1330
        'in':    'recv',
1331
        'read':  'recv',
1332
        'recv':  'recv',
1333
        'out':   'send',
1334
        'write': 'send',
1335
        'send':  'send',
1336
    }
1337

1338
    connected_directions = shutdown_directions.copy()
1✔
1339
    connected_directions['any'] = 'any'
1✔
1340

1341
    def shutdown(self, direction = "send"):
1✔
1342
        """shutdown(direction = "send")
1343

1344
        Closes the tube for futher reading or writing depending on `direction`.
1345

1346
        Arguments:
1347
          direction(str): Which direction to close; "in", "read" or "recv"
1348
            closes the tube in the ingoing direction, "out", "write" or "send"
1349
            closes it in the outgoing direction.
1350

1351
        Returns:
1352
          :const:`None`
1353

1354
        Examples:
1355

1356
            >>> def p(x): print(x)
1357
            >>> t = tube()
1358
            >>> t.shutdown_raw = p
1359
            >>> _=list(map(t.shutdown, ('in', 'read', 'recv', 'out', 'write', 'send')))
1360
            recv
1361
            recv
1362
            recv
1363
            send
1364
            send
1365
            send
1366
            >>> t.shutdown('bad_value')
1367
            Traceback (most recent call last):
1368
            ...
1369
            KeyError: "direction must be in ['in', 'out', 'read', 'recv', 'send', 'write']"
1370
        """
1371
        try:
1✔
1372
            direction = self.shutdown_directions[direction]
1✔
1373
        except KeyError:
1✔
1374
            raise KeyError('direction must be in %r' % sorted(self.shutdown_directions))
1✔
1375
        else:
1376
            self.shutdown_raw(self.shutdown_directions[direction])
1✔
1377

1378
    def connected(self, direction = 'any'):
1✔
1379
        """connected(direction = 'any') -> bool
1380

1381
        Returns True if the tube is connected in the specified direction.
1382

1383
        Arguments:
1384
          direction(str): Can be the string 'any', 'in', 'read', 'recv',
1385
                          'out', 'write', 'send'.
1386

1387
        Doctest:
1388

1389
            >>> def p(x): print(x)
1390
            >>> t = tube()
1391
            >>> t.connected_raw = p
1392
            >>> _=list(map(t.connected, ('any', 'in', 'read', 'recv', 'out', 'write', 'send')))
1393
            any
1394
            recv
1395
            recv
1396
            recv
1397
            send
1398
            send
1399
            send
1400
            >>> t.connected('bad_value')
1401
            Traceback (most recent call last):
1402
            ...
1403
            KeyError: "direction must be in ['any', 'in', 'out', 'read', 'recv', 'send', 'write']"
1404
        """
1405
        try:
1✔
1406
            direction = self.connected_directions[direction]
1✔
1407
        except KeyError:
1✔
1408
            raise KeyError('direction must be in %r' % sorted(self.connected_directions))
1✔
1409
        else:
1410
            return self.connected_raw(direction)
1✔
1411

1412
    def __enter__(self):
1✔
1413
        """Permit use of 'with' to control scoping and closing sessions.
1414

1415
        Examples:
1416

1417
            >>> t = tube()
1418
            >>> def p(x): print(x)
1419
            >>> t.close = lambda: p("Closed!")
1420
            >>> with t: pass
1421
            Closed!
1422
        """
1423
        return self
1✔
1424

1425
    def __exit__(self, type, value, traceback):
1✔
1426
        """Handles closing for 'with' statement
1427

1428
        See :meth:`__enter__`
1429
        """
1430
        self.close()
1✔
1431

1432
    # The minimal interface to be implemented by a child
1433
    @abc.abstractmethod
1✔
1434
    def recv_raw(self, numb):
1✔
1435
        """recv_raw(numb) -> str
1436

1437
        Should not be called directly. Receives data without using the buffer
1438
        on the object.
1439

1440
        Unless there is a timeout or closed connection, this should always
1441
        return data. In case of a timeout, it should return None, in case
1442
        of a closed connection it should raise an ``exceptions.EOFError``.
1443
        """
1444

1445
        raise EOFError('Not implemented')
1✔
1446

1447
    @abc.abstractmethod
1✔
1448
    def send_raw(self, data):
1✔
1449
        """send_raw(data)
1450

1451
        Should not be called directly. Sends data to the tube.
1452

1453
        Should return ``exceptions.EOFError``, if it is unable to send any
1454
        more, because of a closed tube.
1455
        """
1456

UNCOV
1457
        raise EOFError('Not implemented')
×
1458

1459
    def settimeout_raw(self, timeout):
1✔
1460
        """settimeout_raw(timeout)
1461

1462
        Should not be called directly. Sets the timeout for
1463
        the tube.
1464
        """
1465

1466
        raise NotImplementedError()
1✔
1467

1468
    def timeout_change(self):
1✔
1469
        """
1470
        Should not be called directly. Informs the raw layer of the tube that the timeout has changed.
1471

1472

1473
        Inherited from :class:`Timeout`.
1474
        """
1475
        try:
1✔
1476
            self.settimeout_raw(self.timeout)
1✔
1477
        except NotImplementedError:
1✔
1478
            pass
1✔
1479

1480
    def can_recv_raw(self, timeout):
1✔
1481
        """can_recv_raw(timeout) -> bool
1482

1483
        Should not be called directly. Returns True, if
1484
        there is data available within the timeout, but
1485
        ignores the buffer on the object.
1486
        """
1487

UNCOV
1488
        raise NotImplementedError()
×
1489

1490
    def connected_raw(self, direction):
1✔
1491
        """connected(direction = 'any') -> bool
1492

1493
        Should not be called directly.  Returns True iff the
1494
        tube is connected in the given direction.
1495
        """
1496

NEW
1497
        raise NotImplementedError()
×
1498

1499
    def close(self):
1✔
1500
        """close()
1501

1502
        Closes the tube.
1503
        """
UNCOV
1504
        pass
×
1505
        # Ideally we could:
1506
        # raise NotImplementedError()
1507
        # But this causes issues with the unit tests.
1508

1509
    def fileno(self):
1✔
1510
        """fileno() -> int
1511

1512
        Returns the file number used for reading.
1513
        """
1514

UNCOV
1515
        raise NotImplementedError()
×
1516

1517
    def shutdown_raw(self, direction):
1✔
1518
        """shutdown_raw(direction)
1519

1520
        Should not be called directly.  Closes the tube for further reading or
1521
        writing.
1522
        """
1523

UNCOV
1524
        raise NotImplementedError()
×
1525

1526

1527
    def p64(self, *a, **kw):        return self.send(packing.p64(*a, **kw))
1!
1528
    def p32(self, *a, **kw):        return self.send(packing.p32(*a, **kw))
1!
1529
    def p16(self, *a, **kw):        return self.send(packing.p16(*a, **kw))
1!
1530
    def p8(self, *a, **kw):         return self.send(packing.p8(*a, **kw))
1!
1531
    def pack(self, *a, **kw):       return self.send(packing.pack(*a, **kw))
1✔
1532

1533
    def u64(self, *a, **kw):        return packing.u64(self.recvn(8), *a, **kw)
1!
1534
    def u32(self, *a, **kw):        return packing.u32(self.recvn(4), *a, **kw)
1!
1535
    def u16(self, *a, **kw):        return packing.u16(self.recvn(2), *a, **kw)
1!
1536
    def u8(self, *a, **kw):         return packing.u8(self.recvn(1), *a, **kw)
1!
1537
    def unpack(self, *a, **kw):     return packing.unpack(self.recvn(context.bytes), *a, **kw)
1✔
1538

1539
    def flat(self, *a, **kw):       return self.send(packing.flat(*a,**kw))
1!
1540
    def fit(self, *a, **kw):        return self.send(packing.fit(*a, **kw))
1!
1541

1542
    # Dynamic functions
1543

1544
    def make_wrapper(func):
1✔
1545
        def wrapperb(self, *a, **kw):
1✔
UNCOV
1546
            return bytearray(func(self, *a, **kw))
×
1547
        def wrapperS(self, *a, **kw):
1✔
1548
            return packing._decode(func(self, *a, **kw))
1✔
1549
        wrapperb.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a bytearray'.format(func=func)
1✔
1550
        wrapperb.__name__ = func.__name__ + 'b'
1✔
1551
        wrapperS.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a str, ' \
1✔
1552
                           'decoding the result using `context.encoding`. ' \
1553
                           '(note that the binary versions are way faster)'.format(func=func)
1554
        wrapperS.__name__ = func.__name__ + 'S'
1✔
1555
        return wrapperb, wrapperS
1✔
1556

1557
    for func in [recv,
1✔
1558
                 recvn,
1559
                 recvall,
1560
                 recvrepeat,
1561
                 recvuntil,
1562
                 recvpred,
1563
                 recvregex,
1564
                 recvline,
1565
                 recvline_contains,
1566
                 recvline_startswith,
1567
                 recvline_endswith,
1568
                 recvline_regex]:
1569
        for wrapper in make_wrapper(func):
1✔
1570
            locals()[wrapper.__name__] = wrapper
1✔
1571

1572
    def make_wrapper(func, alias):
1✔
1573
        def wrapper(self, *a, **kw):
1✔
1574
            return func(self, *a, **kw)
1✔
1575
        wrapper.__doc__ = 'Alias for :meth:`{func.__name__}`'.format(func=func)
1✔
1576
        wrapper.__name__ = alias
1✔
1577
        return wrapper
1✔
1578

1579
    for _name in list(locals()):
1✔
1580
        if 'recv' in _name:
1✔
1581
            _name2 = _name.replace('recv', 'read')
1✔
1582
        elif 'send' in _name:
1✔
1583
            _name2 = _name.replace('send', 'write')
1✔
1584
        else:
1585
            continue
1✔
1586
        locals()[_name2] = make_wrapper(locals()[_name], _name2)
1✔
1587

1588
    # Clean up the scope
1589
    del wrapper, func, make_wrapper, _name, _name2
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc