• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 39eb5b9f9fc3ac2301be5a7a982096b3568d76e1

01 Nov 2023 10:20PM UTC coverage: 73.405% (+1.9%) from 71.525%
39eb5b9f9fc3ac2301be5a7a982096b3568d76e1

push

github-actions

Arusekk
Update CHANGELOG.md

3902 of 6416 branches covered (0.0%)

12255 of 16695 relevant lines covered (73.41%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.12
/pwnlib/tubes/tube.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4

5
import abc
1✔
6
import logging
1✔
7
import os
1✔
8
import re
1✔
9
import six
1✔
10
import string
1✔
11
import subprocess
1✔
12
import sys
1✔
13
import threading
1✔
14
import time
1✔
15

16
from six.moves import range
1✔
17

18
from pwnlib import atexit
1✔
19
from pwnlib import term
1✔
20
from pwnlib.context import context
1✔
21
from pwnlib.log import Logger
1✔
22
from pwnlib.timeout import Timeout
1✔
23
from pwnlib.tubes.buffer import Buffer
1✔
24
from pwnlib.util import misc
1✔
25
from pwnlib.util import packing
1✔
26

27

28
class tube(Timeout, Logger):
1✔
29
    """
30
    Container of all the tube functions common to sockets, TTYs and SSH connetions.
31
    """
32

33
    default = Timeout.default
1✔
34
    forever = Timeout.forever
1✔
35

36
    def __init__(self, timeout = default, level = None, *a, **kw):
1✔
37
        super(tube, self).__init__(timeout)
1✔
38

39
        Logger.__init__(self, None)
1✔
40
        if level is not None:
1✔
41
            self.setLevel(level)
1✔
42

43
        self.buffer = Buffer(*a, **kw)
1✔
44
        self._newline = None
1✔
45
        atexit.register(self.close)
1✔
46

47
    @property
1✔
48
    def newline(self):
49
        r'''Character sent with methods like sendline() or used for recvline().
50

51
            >>> t = tube()
52
            >>> t.newline = b'X'
53
            >>> t.unrecv(b'A\nB\nCX')
54
            >>> t.recvline()
55
            b'A\nB\nCX'
56

57
            >>> t = tube()
58
            >>> context.newline = b'\r\n'
59
            >>> t.newline
60
            b'\r\n'
61

62
            # Clean up
63
            >>> context.clear()
64
        '''
65
        if self._newline is not None:
1✔
66
            return self._newline
1✔
67
        return context.newline
1✔
68

69
    @newline.setter
1✔
70
    def newline(self, newline):
71
        self._newline = packing._need_bytes(newline)
1✔
72

73
    # Functions based on functions from subclasses
74
    def recv(self, numb = None, timeout = default):
1✔
75
        r"""recv(numb = 4096, timeout = default) -> bytes
76

77
        Receives up to `numb` bytes of data from the tube, and returns
78
        as soon as any quantity of data is available.
79

80
        If the request is not satisfied before ``timeout`` seconds pass,
81
        all data is buffered and an empty string (``''``) is returned.
82

83
        Raises:
84
            exceptions.EOFError: The connection is closed
85

86
        Returns:
87
            A bytes object containing bytes received from the socket,
88
            or ``''`` if a timeout occurred while waiting.
89

90
        Examples:
91

92
            >>> t = tube()
93
            >>> # Fake a data source
94
            >>> t.recv_raw = lambda n: b'Hello, world'
95
            >>> t.recv() == b'Hello, world'
96
            True
97
            >>> t.unrecv(b'Woohoo')
98
            >>> t.recv() == b'Woohoo'
99
            True
100
            >>> with context.local(log_level='debug'):
101
            ...    _ = t.recv() # doctest: +ELLIPSIS
102
            [...] Received 0xc bytes:
103
                b'Hello, world'
104
        """
105
        numb = self.buffer.get_fill_size(numb)
1✔
106
        return self._recv(numb, timeout) or b''
1✔
107

108
    def unrecv(self, data):
1✔
109
        """unrecv(data)
110

111
        Puts the specified data back at the beginning of the receive
112
        buffer.
113

114
        Examples:
115

116
            >>> t = tube()
117
            >>> t.recv_raw = lambda n: b'hello'
118
            >>> t.recv()
119
            b'hello'
120
            >>> t.recv()
121
            b'hello'
122
            >>> t.unrecv(b'world')
123
            >>> t.recv()
124
            b'world'
125
            >>> t.recv()
126
            b'hello'
127
        """
128
        data = packing._need_bytes(data)
1✔
129
        self.buffer.unget(data)
1✔
130

131
    def _fillbuffer(self, timeout = default):
1✔
132
        """_fillbuffer(timeout = default)
133

134
        Fills the internal buffer from the pipe, by calling
135
        :meth:`recv_raw` exactly once.
136

137
        Returns:
138

139
            The bytes of data received, or ``''`` if no data was received.
140

141
        Examples:
142

143
            >>> t = tube()
144
            >>> t.recv_raw = lambda *a: b'abc'
145
            >>> len(t.buffer)
146
            0
147
            >>> t._fillbuffer()
148
            b'abc'
149
            >>> len(t.buffer)
150
            3
151
        """
152
        data = b''
1✔
153

154
        with self.local(timeout):
1✔
155
            data = self.recv_raw(self.buffer.get_fill_size())
1✔
156

157
        if data and self.isEnabledFor(logging.DEBUG):
1✔
158
            self.debug('Received %#x bytes:' % len(data))
1✔
159
            self.maybe_hexdump(data, level=logging.DEBUG)
1✔
160
        if data:
1✔
161
            self.buffer.add(data)
1✔
162

163
        return data
1✔
164

165

166
    def _recv(self, numb = None, timeout = default):
1✔
167
        """_recv(numb = 4096, timeout = default) -> str
168

169
        Receives one chunk of from the internal buffer or from the OS if the
170
        buffer is empty.
171
        """
172
        numb = self.buffer.get_fill_size(numb)
1✔
173

174
        # No buffered data, could not put anything in the buffer
175
        # before timeout.
176
        if not self.buffer and not self._fillbuffer(timeout):
1✔
177
            return b''
1✔
178

179
        return self.buffer.get(numb)
1✔
180

181
    def recvpred(self, pred, timeout = default):
1✔
182
        """recvpred(pred, timeout = default) -> bytes
183

184
        Receives one byte at a time from the tube, until ``pred(all_bytes)``
185
        evaluates to True.
186

187
        If the request is not satisfied before ``timeout`` seconds pass,
188
        all data is buffered and an empty string (``''``) is returned.
189

190
        Arguments:
191
            pred(callable): Function to call, with the currently-accumulated data.
192
            timeout(int): Timeout for the operation
193

194
        Raises:
195
            exceptions.EOFError: The connection is closed
196

197
        Returns:
198
            A bytes object containing bytes received from the socket,
199
            or ``''`` if a timeout occurred while waiting.
200

201
        Examples:
202

203
            >>> t = tube()
204
            >>> t.recv_raw = lambda n: b'abbbaccc'
205
            >>> pred = lambda p: p.count(b'a') == 2
206
            >>> t.recvpred(pred)
207
            b'abbba'
208
            >>> pred = lambda p: p.count(b'd') > 0
209
            >>> t.recvpred(pred, timeout=0.05)
210
            b''
211
        """
212

213
        data = b''
1✔
214

215
        with self.countdown(timeout):
1✔
216
            while not pred(data):
1✔
217
                if not self.countdown_active():
1✔
218
                    self.unrecv(data)
1✔
219
                    return b''
1✔
220

221
                try:
1✔
222
                    res = self.recv(1, timeout=timeout)
1✔
223
                except Exception:
×
224
                    self.unrecv(data)
×
225
                    return b''
×
226

227
                if res:
1!
228
                    data += res
1✔
229
                else:
230
                    self.unrecv(data)
×
231
                    return b''
×
232

233
        return data
1✔
234

235
    def recvn(self, numb, timeout = default):
1✔
236
        """recvn(numb, timeout = default) -> bytes
237

238
        Receives exactly `n` bytes.
239

240
        If the request is not satisfied before ``timeout`` seconds pass,
241
        all data is buffered and an empty string (``''``) is returned.
242

243
        Raises:
244
            exceptions.EOFError: The connection closed before the request could be satisfied
245

246
        Returns:
247
            A string containing bytes received from the socket,
248
            or ``''`` if a timeout occurred while waiting.
249

250
        Examples:
251

252
            >>> t = tube()
253
            >>> data = b'hello world'
254
            >>> t.recv_raw = lambda *a: data
255
            >>> t.recvn(len(data)) == data
256
            True
257
            >>> t.recvn(len(data)+1) == data + data[:1]
258
            True
259
            >>> t.recv_raw = lambda *a: None
260
            >>> # The remaining data is buffered
261
            >>> t.recv() == data[1:]
262
            True
263
            >>> t.recv_raw = lambda *a: time.sleep(0.01) or b'a'
264
            >>> t.recvn(10, timeout=0.05)
265
            b''
266
            >>> t.recvn(10, timeout=0.06) # doctest: +ELLIPSIS
267
            b'aaaaaa...'
268
        """
269
        # Keep track of how much data has been received
270
        # It will be pasted together at the end if a
271
        # timeout does not occur, or put into the tube buffer.
272
        with self.countdown(timeout):
1✔
273
            while self.countdown_active() and len(self.buffer) < numb and self._fillbuffer(self.timeout):
1✔
274
                pass
1✔
275

276
        if len(self.buffer) < numb:
1✔
277
            return b''
1✔
278

279
        return self.buffer.get(numb)
1✔
280

281
    def recvuntil(self, delims, drop=False, timeout=default):
1✔
282
        """recvuntil(delims, drop=False, timeout=default) -> bytes
283

284
        Receive data until one of `delims` is encountered.
285

286
        If the request is not satisfied before ``timeout`` seconds pass,
287
        all data is buffered and an empty string (``''``) is returned.
288

289
        arguments:
290
            delims(bytes,tuple): Byte-string of delimiters characters, or list of delimiter byte-strings.
291
            drop(bool): Drop the ending.  If :const:`True` it is removed from the end of the return value.
292

293
        Raises:
294
            exceptions.EOFError: The connection closed before the request could be satisfied
295

296
        Returns:
297
            A string containing bytes received from the socket,
298
            or ``''`` if a timeout occurred while waiting.
299

300
        Examples:
301

302
            >>> t = tube()
303
            >>> t.recv_raw = lambda n: b"Hello World!"
304
            >>> t.recvuntil(b' ')
305
            b'Hello '
306
            >>> _=t.clean(0)
307
            >>> # Matches on 'o' in 'Hello'
308
            >>> t.recvuntil((b' ',b'W',b'o',b'r'))
309
            b'Hello'
310
            >>> _=t.clean(0)
311
            >>> # Matches expressly full string
312
            >>> t.recvuntil(b' Wor')
313
            b'Hello Wor'
314
            >>> _=t.clean(0)
315
            >>> # Matches on full string, drops match
316
            >>> t.recvuntil(b' Wor', drop=True)
317
            b'Hello'
318

319
            >>> # Try with regex special characters
320
            >>> t = tube()
321
            >>> t.recv_raw = lambda n: b"Hello|World"
322
            >>> t.recvuntil(b'|', drop=True)
323
            b'Hello'
324

325
        """
326
        # Convert string into singleton tupple
327
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
328
            delims = (delims,)
1✔
329
        delims = tuple(map(packing._need_bytes, delims))
1✔
330

331
        # Longest delimiter for tracking purposes
332
        longest = max(map(len, delims))
1✔
333

334
        # Cumulative data to search
335
        data = []
1✔
336
        top = b''
1✔
337

338
        with self.countdown(timeout):
1✔
339
            while self.countdown_active():
1!
340
                try:
1✔
341
                    res = self.recv(timeout=self.timeout)
1✔
342
                except Exception:
×
343
                    self.unrecv(b''.join(data) + top)
×
344
                    raise
×
345

346
                if not res:
1✔
347
                    self.unrecv(b''.join(data) + top)
1✔
348
                    return b''
1✔
349

350
                top += res
1✔
351
                start = len(top)
1✔
352
                for d in delims:
1✔
353
                    j = top.find(d)
1✔
354
                    if start > j > -1:
1✔
355
                        start = j
1✔
356
                        end = j + len(d)
1✔
357
                if start < len(top):
1✔
358
                    self.unrecv(top[end:])
1✔
359
                    if drop:
1✔
360
                        top = top[:start]
1✔
361
                    else:
362
                        top = top[:end]
1✔
363
                    return b''.join(data) + top
1✔
364
                if len(top) > longest:
1✔
365
                    i = -longest - 1
1✔
366
                    data.append(top[:i])
1✔
367
                    top = top[i:]
1✔
368

369
        return b''
×
370

371
    def recvlines(self, numlines=2**20, keepends=False, timeout=default):
1✔
372
        r"""recvlines(numlines, keepends=False, timeout=default) -> list of bytes objects
373

374
        Receive up to ``numlines`` lines.
375

376
        A "line" is any sequence of bytes terminated by the byte sequence
377
        set by :attr:`newline`, which defaults to ``'\n'``.
378

379
        If the request is not satisfied before ``timeout`` seconds pass,
380
        all data is buffered and an empty string (``''``) is returned.
381

382
        Arguments:
383
            numlines(int): Maximum number of lines to receive
384
            keepends(bool): Keep newlines at the end of each line (:const:`False`).
385
            timeout(int): Maximum timeout
386

387
        Raises:
388
            exceptions.EOFError: The connection closed before the request could be satisfied
389

390
        Returns:
391
            A string containing bytes received from the socket,
392
            or ``''`` if a timeout occurred while waiting.
393

394
        Examples:
395

396
            >>> t = tube()
397
            >>> t.recv_raw = lambda n: b'\n'
398
            >>> t.recvlines(3)
399
            [b'', b'', b'']
400
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
401
            >>> t.recvlines(3)
402
            [b'Foo', b'Bar', b'Baz']
403
            >>> t.recvlines(3, True)
404
            [b'Foo\n', b'Bar\n', b'Baz\n']
405
        """
406
        lines = []
1✔
407
        with self.countdown(timeout):
1✔
408
            for _ in range(numlines):
1✔
409
                try:
1✔
410
                    # We must set 'keepends' to True here so that we can
411
                    # restore the original, unmodified data to the buffer
412
                    # in the event of a timeout.
413
                    res = self.recvline(keepends=True, timeout=timeout)
1✔
414
                except Exception:
×
415
                    self.unrecv(b''.join(lines))
×
416
                    raise
×
417

418
                if res:
1!
419
                    lines.append(res)
1✔
420
                else:
421
                    break
×
422

423
        if not keepends:
1✔
424
            lines = [line.rstrip(self.newline) for line in lines]
1✔
425

426
        return lines
1✔
427

428
    def recvlinesS(self, numlines=2**20, keepends=False, timeout=default):
1✔
429
        r"""recvlinesS(numlines, keepends=False, timeout=default) -> str list
430

431
        This function is identical to :meth:`recvlines`, but decodes
432
        the received bytes into string using :func:`context.encoding`.
433
        You should use :meth:`recvlines` whenever possible for better performance.
434

435
        Examples:
436

437
            >>> t = tube()
438
            >>> t.recv_raw = lambda n: b'\n'
439
            >>> t.recvlinesS(3)
440
            ['', '', '']
441
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
442
            >>> t.recvlinesS(3)
443
            ['Foo', 'Bar', 'Baz']
444
        """
445
        return [packing._decode(x) for x in self.recvlines(numlines, keepends, timeout)]
1✔
446

447
    def recvlinesb(self, numlines=2**20, keepends=False, timeout=default):
1✔
448
        r"""recvlinesb(numlines, keepends=False, timeout=default) -> bytearray list
449

450
        This function is identical to :meth:`recvlines`, but returns a bytearray.
451

452
        Examples:
453

454
            >>> t = tube()
455
            >>> t.recv_raw = lambda n: b'\n'
456
            >>> t.recvlinesb(3)
457
            [bytearray(b''), bytearray(b''), bytearray(b'')]
458
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
459
            >>> t.recvlinesb(3)
460
            [bytearray(b'Foo'), bytearray(b'Bar'), bytearray(b'Baz')]
461
        """
462
        return [bytearray(x) for x in self.recvlines(numlines, keepends, timeout)]
1✔
463

464
    def recvline(self, keepends=True, timeout=default):
1✔
465
        r"""recvline(keepends=True, timeout=default) -> bytes
466

467
        Receive a single line from the tube.
468

469
        A "line" is any sequence of bytes terminated by the byte sequence
470
        set in :attr:`newline`, which defaults to ``'\n'``.
471

472
        If the request is not satisfied before ``timeout`` seconds pass,
473
        all data is buffered and an empty string (``''``) is returned.
474

475
        Arguments:
476
            keepends(bool): Keep the line ending (:const:`True`).
477
            timeout(int): Timeout
478

479
        Return:
480
            All bytes received over the tube until the first
481
            newline ``'\n'`` is received.  Optionally retains
482
            the ending.
483

484
        Examples:
485

486
            >>> t = tube()
487
            >>> t.recv_raw = lambda n: b'Foo\nBar\r\nBaz\n'
488
            >>> t.recvline()
489
            b'Foo\n'
490
            >>> t.recvline()
491
            b'Bar\r\n'
492
            >>> t.recvline(keepends = False)
493
            b'Baz'
494
            >>> t.newline = b'\r\n'
495
            >>> t.recvline(keepends = False)
496
            b'Foo\nBar'
497
        """
498
        return self.recvuntil(self.newline, drop = not keepends, timeout = timeout)
1✔
499

500
    def recvline_pred(self, pred, keepends=False, timeout=default):
1✔
501
        r"""recvline_pred(pred, keepends=False) -> bytes
502

503
        Receive data until ``pred(line)`` returns a truthy value.
504
        Drop all other data.
505

506
        If the request is not satisfied before ``timeout`` seconds pass,
507
        all data is buffered and an empty string (``''``) is returned.
508

509
        Arguments:
510
            pred(callable): Function to call.  Returns the line for which
511
                this function returns :const:`True`.
512

513
        Examples:
514

515
            >>> t = tube()
516
            >>> t.recv_raw = lambda n: b"Foo\nBar\nBaz\n"
517
            >>> t.recvline_pred(lambda line: line == b"Bar\n")
518
            b'Bar'
519
            >>> t.recvline_pred(lambda line: line == b"Bar\n", keepends=True)
520
            b'Bar\n'
521
            >>> t.recvline_pred(lambda line: line == b'Nope!', timeout=0.1)
522
            b''
523
        """
524

525
        tmpbuf = Buffer()
1✔
526
        line   = b''
1✔
527
        with self.countdown(timeout):
1✔
528
            while self.countdown_active():
1✔
529
                try:
1✔
530
                    line = self.recvline(keepends=True)
1✔
531
                except Exception:
×
532
                    self.buffer.unget(tmpbuf)
×
533
                    raise
×
534

535
                if not line:
1✔
536
                    self.buffer.unget(tmpbuf)
1✔
537
                    return b''
1✔
538

539
                if pred(line):
1✔
540
                    if not keepends:
1✔
541
                        line = line[:-len(self.newline)]
1✔
542
                    return line
1✔
543
                else:
544
                    tmpbuf.add(line)
1✔
545

546
        return b''
1✔
547

548
    def recvline_contains(self, items, keepends = False, timeout = default):
1✔
549
        r"""
550
        Receive lines until one line is found which contains at least
551
        one of `items`.
552

553
        Arguments:
554
            items(str,tuple): List of strings to search for, or a single string.
555
            keepends(bool): Return lines with newlines if :const:`True`
556
            timeout(int): Timeout, in seconds
557

558
        Examples:
559

560
            >>> t = tube()
561
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
562
            >>> t.recvline_contains(b'r')
563
            b'World'
564
            >>> f = lambda n: b"cat dog bird\napple pear orange\nbicycle car train\n"
565
            >>> t = tube()
566
            >>> t.recv_raw = f
567
            >>> t.recvline_contains(b'pear')
568
            b'apple pear orange'
569
            >>> t = tube()
570
            >>> t.recv_raw = f
571
            >>> t.recvline_contains((b'car', b'train'))
572
            b'bicycle car train'
573
        """
574
        if isinstance(items, (bytes, bytearray, six.text_type)):
1✔
575
            items = (items,)
1✔
576
        items = tuple(map(packing._need_bytes, items))
1✔
577

578
        def pred(line):
1✔
579
            return any(d in line for d in items)
1✔
580

581
        return self.recvline_pred(pred, keepends, timeout)
1✔
582

583
    def recvline_startswith(self, delims, keepends=False, timeout=default):
1✔
584
        r"""recvline_startswith(delims, keepends=False, timeout=default) -> bytes
585

586
        Keep receiving lines until one is found that starts with one of
587
        `delims`.  Returns the last line received.
588

589
        If the request is not satisfied before ``timeout`` seconds pass,
590
        all data is buffered and an empty string (``''``) is returned.
591

592
        Arguments:
593
            delims(str,tuple): List of strings to search for, or string of single characters
594
            keepends(bool): Return lines with newlines if :const:`True`
595
            timeout(int): Timeout, in seconds
596

597
        Returns:
598
            The first line received which starts with a delimiter in ``delims``.
599

600
        Examples:
601

602
            >>> t = tube()
603
            >>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
604
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'))
605
            b'World'
606
            >>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), True)
607
            b'Xylophone\n'
608
            >>> t.recvline_startswith(b'Wo')
609
            b'World'
610
        """
611
        # Convert string into singleton tupple
612
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
613
            delims = (delims,)
1✔
614
        delims = tuple(map(packing._need_bytes, delims))
1✔
615

616
        return self.recvline_pred(lambda line: any(map(line.startswith, delims)),
1✔
617
                                  keepends=keepends,
618
                                  timeout=timeout)
619

620
    def recvline_endswith(self, delims, keepends=False, timeout=default):
1✔
621
        r"""recvline_endswith(delims, keepends=False, timeout=default) -> bytes
622

623
        Keep receiving lines until one is found that ends with one of
624
        `delims`.  Returns the last line received.
625

626
        If the request is not satisfied before ``timeout`` seconds pass,
627
        all data is buffered and an empty string (``''``) is returned.
628

629
        See :meth:`recvline_startswith` for more details.
630

631
        Examples:
632

633
            >>> t = tube()
634
            >>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\nKaboodle\n'
635
            >>> t.recvline_endswith(b'r')
636
            b'Bar'
637
            >>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), True)
638
            b'Kaboodle\n'
639
            >>> t.recvline_endswith(b'oodle')
640
            b'Kaboodle'
641
        """
642
        # Convert string into singleton tupple
643
        if isinstance(delims, (bytes, bytearray, six.text_type)):
1✔
644
            delims = (delims,)
1✔
645

646
        delims = tuple(packing._need_bytes(delim) + self.newline for delim in delims)
1✔
647

648
        return self.recvline_pred(lambda line: any(map(line.endswith, delims)),
1✔
649
                                  keepends=keepends,
650
                                  timeout=timeout)
651

652
    def recvregex(self, regex, exact=False, timeout=default, capture=False):
1✔
653
        r"""recvregex(regex, exact=False, timeout=default, capture=False) -> bytes
654

655
        Wrapper around :func:`recvpred`, which will return when a regex
656
        matches the string in the buffer.
657

658
        Returns all received data up until the regex matched. If `capture` is
659
        set to True, a :class:`re.Match` object is returned instead.
660

661
        By default :func:`re.RegexObject.search` is used, but if `exact` is
662
        set to True, then :func:`re.RegexObject.match` will be used instead.
663

664
        If the request is not satisfied before ``timeout`` seconds pass,
665
        all data is buffered and an empty string (``''``) is returned.
666

667
        Examples:
668

669
            >>> t = tube()
670
            >>> t.recv_raw = lambda n: b'The lucky number is 1337 as always\nBla blubb blargh\n'
671
            >>> m = t.recvregex(br'number is ([0-9]+) as always\n', capture=True)
672
            >>> m.group(1)
673
            b'1337'
674
            >>> t.recvregex(br'Bla .* blargh\n')
675
            b'Bla blubb blargh\n'
676
        """
677

678
        if isinstance(regex, (bytes, bytearray, six.text_type)):
1!
679
            regex = packing._need_bytes(regex)
1✔
680
            regex = re.compile(regex)
1✔
681

682
        if exact:
1!
683
            pred = regex.match
×
684
        else:
685
            pred = regex.search
1✔
686

687
        if capture:
1✔
688
            return pred(self.recvpred(pred, timeout = timeout))
1✔
689
        else:
690
            return self.recvpred(pred, timeout = timeout)
1✔
691

692
    def recvline_regex(self, regex, exact=False, keepends=False, timeout=default):
1✔
693
        """recvline_regex(regex, exact=False, keepends=False, timeout=default) -> bytes
694

695
        Wrapper around :func:`recvline_pred`, which will return when a regex
696
        matches a line.
697

698
        By default :func:`re.RegexObject.search` is used, but if `exact` is
699
        set to True, then :func:`re.RegexObject.match` will be used instead.
700

701
        If the request is not satisfied before ``timeout`` seconds pass,
702
        all data is buffered and an empty string (``''``) is returned.
703
        """
704

705
        if isinstance(regex, (bytes, bytearray, six.text_type)):
1!
706
            regex = packing._need_bytes(regex)
1✔
707
            regex = re.compile(regex)
1✔
708

709
        if exact:
1!
710
            pred = regex.match
×
711
        else:
712
            pred = regex.search
1✔
713

714
        return self.recvline_pred(pred, keepends = keepends, timeout = timeout)
1✔
715

716
    def recvrepeat(self, timeout=default):
1✔
717
        """recvrepeat(timeout=default) -> bytes
718

719
        Receives data until a timeout or EOF is reached.
720

721
        Examples:
722

723
            >>> data = [
724
            ... b'd',
725
            ... b'', # simulate timeout
726
            ... b'c',
727
            ... b'b',
728
            ... b'a',
729
            ... ]
730
            >>> def delayrecv(n, data=data):
731
            ...     return data.pop()
732
            >>> t = tube()
733
            >>> t.recv_raw = delayrecv
734
            >>> t.recvrepeat(0.2)
735
            b'abc'
736
            >>> t.recv()
737
            b'd'
738
        """
739

740
        try:
1✔
741
            while self._fillbuffer(timeout=timeout):
1✔
742
                pass
1✔
743
        except EOFError:
×
744
            pass
×
745

746
        return self.buffer.get()
1✔
747

748
    def recvall(self, timeout=Timeout.forever):
1✔
749
        """recvall(timeout=Timeout.forever) -> bytes
750

751
        Receives data until EOF is reached and closes the tube.
752
        """
753

754
        with self.waitfor('Receiving all data') as h:
1✔
755
            l = len(self.buffer)
1✔
756
            with self.local(timeout):
1✔
757
                try:
1✔
758
                    while True:
1✔
759
                        l = misc.size(len(self.buffer))
1✔
760
                        h.status(l)
1✔
761
                        if not self._fillbuffer():
1✔
762
                            break
1✔
763
                except EOFError:
1✔
764
                    pass
1✔
765
            h.success("Done (%s)" % l)
1✔
766
        self.close()
1✔
767

768
        return self.buffer.get()
1✔
769

770
    def send(self, data):
1✔
771
        """send(data)
772

773
        Sends data.
774

775
        If log level ``DEBUG`` is enabled, also prints out the data
776
        received.
777

778
        If it is not possible to send anymore because of a closed
779
        connection, it raises ``exceptions.EOFError``
780

781
        Examples:
782

783
            >>> def p(x): print(repr(x))
784
            >>> t = tube()
785
            >>> t.send_raw = p
786
            >>> t.send(b'hello')
787
            b'hello'
788
        """
789

790
        data = packing._need_bytes(data)
1✔
791

792
        if self.isEnabledFor(logging.DEBUG):
1!
793
            self.debug('Sent %#x bytes:' % len(data))
×
794
            self.maybe_hexdump(data, level=logging.DEBUG)
×
795

796
        self.send_raw(data)
1✔
797

798
    def sendline(self, line=b''):
1✔
799
        r"""sendline(data)
800

801
        Shorthand for ``t.send(data + t.newline)``.
802

803
        Examples:
804

805
            >>> def p(x): print(repr(x))
806
            >>> t = tube()
807
            >>> t.send_raw = p
808
            >>> t.sendline(b'hello')
809
            b'hello\n'
810
            >>> t.newline = b'\r\n'
811
            >>> t.sendline(b'hello')
812
            b'hello\r\n'
813
        """
814

815
        line = packing._need_bytes(line)
1✔
816

817
        self.send(line + self.newline)
1✔
818

819
    def sendlines(self, lines=[]):
1✔
820
        for line in lines:
×
821
            line = packing._need_bytes(line)
×
822
            self.sendline(line)
×
823

824
    def sendafter(self, delim, data, timeout = default):
1✔
825
        """sendafter(delim, data, timeout = default) -> str
826

827
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``send(data)``.
828
        """
829

830
        data = packing._need_bytes(data)
×
831
        res = self.recvuntil(delim, timeout=timeout)
×
832
        self.send(data)
×
833
        return res
×
834

835
    def sendlineafter(self, delim, data, timeout = default):
1✔
836
        """sendlineafter(delim, data, timeout = default) -> str
837

838
        A combination of ``recvuntil(delim, timeout=timeout)`` and ``sendline(data)``."""
839

840
        data = packing._need_bytes(data)
×
841
        res = self.recvuntil(delim, timeout=timeout)
×
842
        self.sendline(data)
×
843
        return res
×
844

845
    def sendthen(self, delim, data, timeout = default):
1✔
846
        """sendthen(delim, data, timeout = default) -> str
847

848
        A combination of ``send(data)`` and ``recvuntil(delim, timeout=timeout)``."""
849

850
        data = packing._need_bytes(data)
×
851
        self.send(data)
×
852
        return self.recvuntil(delim, timeout=timeout)
×
853

854
    def sendlinethen(self, delim, data, timeout = default):
1✔
855
        """sendlinethen(delim, data, timeout = default) -> str
856

857
        A combination of ``sendline(data)`` and ``recvuntil(delim, timeout=timeout)``."""
858

859
        data = packing._need_bytes(data)
×
860
        self.sendline(data)
×
861
        return self.recvuntil(delim, timeout=timeout)
×
862

863
    def interactive(self, prompt = term.text.bold_red('$') + ' '):
1✔
864
        """interactive(prompt = pwnlib.term.text.bold_red('$') + ' ')
865

866
        Does simultaneous reading and writing to the tube. In principle this just
867
        connects the tube to standard in and standard out, but in practice this
868
        is much more usable, since we are using :mod:`pwnlib.term` to print a
869
        floating prompt.
870

871
        Thus it only works while in :data:`pwnlib.term.term_mode`.
872
        """
873

874
        self.info('Switching to interactive mode')
1✔
875

876
        go = threading.Event()
1✔
877
        def recv_thread():
1✔
878
            while not go.isSet():
1✔
879
                try:
1✔
880
                    cur = self.recv(timeout = 0.05)
1✔
881
                    cur = cur.replace(self.newline, b'\n')
1✔
882
                    if cur:
1!
883
                        stdout = sys.stdout
×
884
                        if not term.term_mode:
×
885
                            stdout = getattr(stdout, 'buffer', stdout)
×
886
                        stdout.write(cur)
×
887
                        stdout.flush()
×
888
                except EOFError:
×
889
                    self.info('Got EOF while reading in interactive')
×
890
                    break
×
891

892
        t = context.Thread(target = recv_thread)
1✔
893
        t.daemon = True
1✔
894
        t.start()
1✔
895

896
        from pwnlib.args import term_mode
1✔
897
        try:
1✔
898
            os_linesep = os.linesep.encode()
1✔
899
            to_skip = b''
1✔
900
            while not go.isSet():
1✔
901
                if term.term_mode:
1!
902
                    data = term.readline.readline(prompt = prompt, float = True)
×
903
                    if data:
×
904
                        data += self.newline
×
905
                else:
906
                    stdin = getattr(sys.stdin, 'buffer', sys.stdin)
1✔
907
                    data = stdin.read(1)
1✔
908
                    # Keep OS's line separator if NOTERM is set and
909
                    # the user did not specify a custom newline
910
                    # even if stdin is a tty.
911
                    if sys.stdin.isatty() and (
1!
912
                        term_mode
913
                        or context.newline != b"\n"
914
                        or self._newline is not None
915
                    ):
916
                        if to_skip:
×
917
                            if to_skip[:1] != data:
×
918
                                data = os_linesep[: -len(to_skip)] + data
×
919
                            else:
920
                                to_skip = to_skip[1:]
×
921
                                if to_skip:
×
922
                                    continue
×
923
                                data = self.newline
×
924
                        # If we observe a prefix of the line separator in a tty,
925
                        # assume we'll see the rest of it immediately after.
926
                        # This could stall until the next character is seen if
927
                        # the line separator is started but never finished, but
928
                        # that is unlikely to happen in a dynamic tty.
929
                        elif data and os_linesep.startswith(data):
×
930
                            if len(os_linesep) > 1:
×
931
                                to_skip = os_linesep[1:]
×
932
                                continue
×
933
                            data = self.newline
×
934

935
                if data:
1!
936
                    try:
×
937
                        self.send(data)
×
938
                    except EOFError:
×
939
                        go.set()
×
940
                        self.info('Got EOF while sending in interactive')
×
941
                else:
942
                    go.set()
1✔
943
        except KeyboardInterrupt:
×
944
            self.info('Interrupted')
×
945
            go.set()
×
946

947
        while t.is_alive():
1✔
948
            t.join(timeout = 0.1)
1✔
949

950
    def stream(self, line_mode=True):
1✔
951
        """stream()
952

953
        Receive data until the tube exits, and print it to stdout.
954

955
        Similar to :func:`interactive`, except that no input is sent.
956

957
        Similar to ``print(tube.recvall())`` except that data is printed
958
        as it is received, rather than after all data is received.
959

960
        Arguments:
961
            line_mode(bool): Whether to receive line-by-line or raw data.
962

963
        Returns:
964
            All data printed.
965
        """
966
        buf = Buffer()
×
967
        function = self.recvline if line_mode else self.recv
×
968
        try:
×
969
            while True:
×
970
                buf.add(function())
×
971
                stdout = sys.stdout
×
972
                if not term.term_mode:
×
973
                    stdout = getattr(stdout, 'buffer', stdout)
×
974
                stdout.write(buf.data[-1])
×
975
        except KeyboardInterrupt:
×
976
            pass
×
977
        except EOFError:
×
978
            pass
×
979

980
        return buf.get()
×
981

982
    def clean(self, timeout = 0.05):
1✔
983
        """clean(timeout = 0.05)
984

985
        Removes all the buffered data from a tube by calling
986
        :meth:`pwnlib.tubes.tube.tube.recv` with a low timeout until it fails.
987

988
        If ``timeout`` is zero, only cached data will be cleared.
989

990
        Note: If timeout is set to zero, the underlying network is
991
        not actually polled; only the internal buffer is cleared.
992

993
        Returns:
994

995
            All data received
996

997
        Examples:
998

999
            >>> t = tube()
1000
            >>> t.unrecv(b'clean me up')
1001
            >>> t.clean(0)
1002
            b'clean me up'
1003
            >>> len(t.buffer)
1004
            0
1005
        """
1006
        if timeout == 0:
1✔
1007
            return self.buffer.get()
1✔
1008

1009
        return self.recvrepeat(timeout)
1✔
1010

1011
    def clean_and_log(self, timeout = 0.05):
1✔
1012
        r"""clean_and_log(timeout = 0.05)
1013

1014
        Works exactly as :meth:`pwnlib.tubes.tube.tube.clean`, but logs received
1015
        data with :meth:`pwnlib.self.info`.
1016

1017
        Returns:
1018

1019
            All data received
1020

1021
        Examples:
1022

1023
            >>> def recv(n, data=[b'', b'hooray_data']):
1024
            ...     while data: return data.pop()
1025
            >>> t = tube()
1026
            >>> t.recv_raw      = recv
1027
            >>> t.connected_raw = lambda d: True
1028
            >>> t.fileno        = lambda: 1234
1029
            >>> with context.local(log_level='info'):
1030
            ...     data = t.clean_and_log() #doctest: +ELLIPSIS
1031
            [DEBUG] Received 0xb bytes:
1032
                b'hooray_data'
1033
            >>> data
1034
            b'hooray_data'
1035
            >>> context.clear()
1036
        """
1037
        cached_data = self.buffer.get()
1✔
1038
        if cached_data and not self.isEnabledFor(logging.DEBUG):
1!
1039
            with context.local(log_level='debug'):
×
1040
                self.debug('Received %#x bytes:' % len(cached_data))
×
1041
                self.maybe_hexdump(cached_data, level=logging.DEBUG)
×
1042
        with context.local(log_level='debug'):
1✔
1043
            return cached_data + self.clean(timeout)
1✔
1044

1045
    def connect_input(self, other):
1✔
1046
        """connect_input(other)
1047

1048
        Connects the input of this tube to the output of another tube object.
1049

1050

1051
        Examples:
1052

1053
            >>> def p(x): print(x.decode())
1054
            >>> def recvone(n, data=[b'data']):
1055
            ...     while data: return data.pop()
1056
            ...     raise EOFError
1057
            >>> a = tube()
1058
            >>> b = tube()
1059
            >>> a.recv_raw = recvone
1060
            >>> b.send_raw = p
1061
            >>> a.connected_raw = lambda d: True
1062
            >>> b.connected_raw = lambda d: True
1063
            >>> a.shutdown      = lambda d: True
1064
            >>> b.shutdown      = lambda d: True
1065
            >>> import time
1066
            >>> _=(b.connect_input(a), time.sleep(0.1))
1067
            data
1068
        """
1069

1070
        def pump():
1✔
1071
            import sys as _sys
1✔
1072
            while self.countdown_active():
1!
1073
                if not (self.connected('send') and other.connected('recv')):
1!
1074
                    break
×
1075

1076
                try:
1✔
1077
                    data = other.recv(timeout = 0.05)
1✔
1078
                except EOFError:
1✔
1079
                    break
1✔
1080

1081
                if not _sys:
1!
1082
                    return
×
1083

1084
                if not data:
1✔
1085
                    continue
1✔
1086

1087
                try:
1✔
1088
                    self.send(data)
1✔
1089
                except EOFError:
×
1090
                    break
×
1091

1092
                if not _sys:
1!
1093
                    return
×
1094

1095
            self.shutdown('send')
1✔
1096
            other.shutdown('recv')
1✔
1097

1098
        t = context.Thread(target = pump)
1✔
1099
        t.daemon = True
1✔
1100
        t.start()
1✔
1101

1102
    def connect_output(self, other):
1✔
1103
        """connect_output(other)
1104

1105
        Connects the output of this tube to the input of another tube object.
1106

1107
        Examples:
1108

1109
            >>> def p(x): print(repr(x))
1110
            >>> def recvone(n, data=[b'data']):
1111
            ...     while data: return data.pop()
1112
            ...     raise EOFError
1113
            >>> a = tube()
1114
            >>> b = tube()
1115
            >>> a.recv_raw = recvone
1116
            >>> b.send_raw = p
1117
            >>> a.connected_raw = lambda d: True
1118
            >>> b.connected_raw = lambda d: True
1119
            >>> a.shutdown      = lambda d: True
1120
            >>> b.shutdown      = lambda d: True
1121
            >>> _=(a.connect_output(b), time.sleep(0.1))
1122
            b'data'
1123
        """
1124

1125
        other.connect_input(self)
1✔
1126

1127
    def connect_both(self, other):
1✔
1128
        """connect_both(other)
1129

1130
        Connects the both ends of this tube object with another tube object."""
1131

1132
        self.connect_input(other)
1✔
1133
        self.connect_output(other)
1✔
1134

1135
    def spawn_process(self, *args, **kwargs):
1✔
1136
        """Spawns a new process having this tube as stdin, stdout and stderr.
1137

1138
        Takes the same arguments as :class:`subprocess.Popen`."""
1139

1140
        return subprocess.Popen(
1✔
1141
            *args,
1142
            stdin = self.fileno(),
1143
            stdout = self.fileno(),
1144
            stderr = self.fileno(),
1145
            **kwargs
1146
        )
1147

1148
    def __lshift__(self, other):
1✔
1149
        """
1150
        Shorthand for connecting multiple tubes.
1151

1152
        See :meth:`connect_input` for more information.
1153

1154
        Examples:
1155

1156
            The following are equivalent ::
1157

1158
                tube_a >> tube.b
1159
                tube_a.connect_input(tube_b)
1160

1161
            This is useful when chaining multiple tubes ::
1162

1163
                tube_a >> tube_b >> tube_a
1164
                tube_a.connect_input(tube_b)
1165
                tube_b.connect_input(tube_a)
1166
        """
1167
        self.connect_input(other)
×
1168
        return other
×
1169

1170
    def __rshift__(self, other):
1✔
1171
        """
1172
        Inverse of the ``<<`` operator.  See :meth:`__lshift__`.
1173

1174
        See :meth:`connect_input` for more information.
1175
        """
1176
        self.connect_output(other)
×
1177
        return other
×
1178

1179
    def __ne__(self, other):
1✔
1180
        """
1181
        Shorthand for connecting tubes to eachother.
1182

1183
        The following are equivalent ::
1184

1185
            a >> b >> a
1186
            a <> b
1187

1188
        See :meth:`connect_input` for more information.
1189
        """
1190
        self << other << self
×
1191

1192
    def wait_for_close(self, timeout=default):
1✔
1193
        """Waits until the tube is closed."""
1194

1195
        with self.countdown(timeout):
1✔
1196
            while self.countdown_active():
1!
1197
                if not self.connected():
1✔
1198
                    return
1✔
1199
                time.sleep(min(self.timeout, 0.05))
1✔
1200

1201
    wait = wait_for_close
1✔
1202

1203
    def can_recv(self, timeout = 0):
1✔
1204
        """can_recv(timeout = 0) -> bool
1205

1206
        Returns True, if there is data available within `timeout` seconds.
1207

1208
        Examples:
1209

1210
            >>> import time
1211
            >>> t = tube()
1212
            >>> t.can_recv_raw = lambda *a: False
1213
            >>> t.can_recv()
1214
            False
1215
            >>> _=t.unrecv(b'data')
1216
            >>> t.can_recv()
1217
            True
1218
            >>> _=t.recv()
1219
            >>> t.can_recv()
1220
            False
1221
        """
1222

1223
        return bool(self.buffer or self.can_recv_raw(timeout))
1✔
1224

1225
    def settimeout(self, timeout):
1✔
1226
        """settimeout(timeout)
1227

1228
        Set the timeout for receiving operations. If the string "default"
1229
        is given, then :data:`context.timeout` will be used. If None is given,
1230
        then there will be no timeout.
1231

1232
        Examples:
1233

1234
            >>> t = tube()
1235
            >>> t.settimeout_raw = lambda t: None
1236
            >>> t.settimeout(3)
1237
            >>> t.timeout == 3
1238
            True
1239
        """
1240

1241
        self.timeout = timeout
1✔
1242

1243

1244
    shutdown_directions = {
1✔
1245
        'in':    'recv',
1246
        'read':  'recv',
1247
        'recv':  'recv',
1248
        'out':   'send',
1249
        'write': 'send',
1250
        'send':  'send',
1251
    }
1252

1253
    connected_directions = shutdown_directions.copy()
1✔
1254
    connected_directions['any'] = 'any'
1✔
1255

1256
    def shutdown(self, direction = "send"):
1✔
1257
        """shutdown(direction = "send")
1258

1259
        Closes the tube for futher reading or writing depending on `direction`.
1260

1261
        Arguments:
1262
          direction(str): Which direction to close; "in", "read" or "recv"
1263
            closes the tube in the ingoing direction, "out", "write" or "send"
1264
            closes it in the outgoing direction.
1265

1266
        Returns:
1267
          :const:`None`
1268

1269
        Examples:
1270

1271
            >>> def p(x): print(x)
1272
            >>> t = tube()
1273
            >>> t.shutdown_raw = p
1274
            >>> _=list(map(t.shutdown, ('in', 'read', 'recv', 'out', 'write', 'send')))
1275
            recv
1276
            recv
1277
            recv
1278
            send
1279
            send
1280
            send
1281
            >>> t.shutdown('bad_value') #doctest: +ELLIPSIS
1282
            Traceback (most recent call last):
1283
            ...
1284
            KeyError: "direction must be in ['in', 'out', 'read', 'recv', 'send', 'write']"
1285
        """
1286
        try:
1✔
1287
            direction = self.shutdown_directions[direction]
1✔
1288
        except KeyError:
1✔
1289
            raise KeyError('direction must be in %r' % sorted(self.shutdown_directions))
1✔
1290
        else:
1291
            self.shutdown_raw(self.shutdown_directions[direction])
1✔
1292

1293
    def connected(self, direction = 'any'):
1✔
1294
        """connected(direction = 'any') -> bool
1295

1296
        Returns True if the tube is connected in the specified direction.
1297

1298
        Arguments:
1299
          direction(str): Can be the string 'any', 'in', 'read', 'recv',
1300
                          'out', 'write', 'send'.
1301

1302
        Doctest:
1303

1304
            >>> def p(x): print(x)
1305
            >>> t = tube()
1306
            >>> t.connected_raw = p
1307
            >>> _=list(map(t.connected, ('any', 'in', 'read', 'recv', 'out', 'write', 'send')))
1308
            any
1309
            recv
1310
            recv
1311
            recv
1312
            send
1313
            send
1314
            send
1315
            >>> t.connected('bad_value') #doctest: +ELLIPSIS
1316
            Traceback (most recent call last):
1317
            ...
1318
            KeyError: "direction must be in ['any', 'in', 'out', 'read', 'recv', 'send', 'write']"
1319
        """
1320
        try:
1✔
1321
            direction = self.connected_directions[direction]
1✔
1322
        except KeyError:
1✔
1323
            raise KeyError('direction must be in %r' % sorted(self.connected_directions))
1✔
1324
        else:
1325
            return self.connected_raw(direction)
1✔
1326

1327
    def __enter__(self):
1✔
1328
        """Permit use of 'with' to control scoping and closing sessions.
1329

1330
        Examples:
1331

1332
            >>> t = tube()
1333
            >>> def p(x): print(x)
1334
            >>> t.close = lambda: p("Closed!")
1335
            >>> with t: pass
1336
            Closed!
1337
        """
1338
        return self
1✔
1339

1340
    def __exit__(self, type, value, traceback):
1✔
1341
        """Handles closing for 'with' statement
1342

1343
        See :meth:`__enter__`
1344
        """
1345
        self.close()
1✔
1346

1347
    # The minimal interface to be implemented by a child
1348
    @abc.abstractmethod
1✔
1349
    def recv_raw(self, numb):
1350
        """recv_raw(numb) -> str
1351

1352
        Should not be called directly. Receives data without using the buffer
1353
        on the object.
1354

1355
        Unless there is a timeout or closed connection, this should always
1356
        return data. In case of a timeout, it should return None, in case
1357
        of a closed connection it should raise an ``exceptions.EOFError``.
1358
        """
1359

1360
        raise EOFError('Not implemented')
1✔
1361

1362
    @abc.abstractmethod
1✔
1363
    def send_raw(self, data):
1364
        """send_raw(data)
1365

1366
        Should not be called directly. Sends data to the tube.
1367

1368
        Should return ``exceptions.EOFError``, if it is unable to send any
1369
        more, because of a closed tube.
1370
        """
1371

1372
        raise EOFError('Not implemented')
×
1373

1374
    def settimeout_raw(self, timeout):
1✔
1375
        """settimeout_raw(timeout)
1376

1377
        Should not be called directly. Sets the timeout for
1378
        the tube.
1379
        """
1380

1381
        raise NotImplementedError()
1✔
1382

1383
    def timeout_change(self):
1✔
1384
        """
1385
        Should not be called directly. Informs the raw layer of the tube that the timeout has changed.
1386

1387

1388
        Inherited from :class:`Timeout`.
1389
        """
1390
        try:
1✔
1391
            self.settimeout_raw(self.timeout)
1✔
1392
        except NotImplementedError:
1✔
1393
            pass
1✔
1394

1395
    def can_recv_raw(self, timeout):
1✔
1396
        """can_recv_raw(timeout) -> bool
1397

1398
        Should not be called directly. Returns True, if
1399
        there is data available within the timeout, but
1400
        ignores the buffer on the object.
1401
        """
1402

1403
        raise NotImplementedError()
×
1404

1405
    def connected_raw(self, direction):
1✔
1406
        """connected(direction = 'any') -> bool
1407

1408
        Should not be called directly.  Returns True iff the
1409
        tube is connected in the given direction.
1410
        """
1411

1412
        raise NotImplementedError()
×
1413

1414
    def close(self):
1✔
1415
        """close()
1416

1417
        Closes the tube.
1418
        """
1419
        pass
×
1420
        # Ideally we could:
1421
        # raise NotImplementedError()
1422
        # But this causes issues with the unit tests.
1423

1424
    def fileno(self):
1✔
1425
        """fileno() -> int
1426

1427
        Returns the file number used for reading.
1428
        """
1429

1430
        raise NotImplementedError()
×
1431

1432
    def shutdown_raw(self, direction):
1✔
1433
        """shutdown_raw(direction)
1434

1435
        Should not be called directly.  Closes the tube for further reading or
1436
        writing.
1437
        """
1438

1439
        raise NotImplementedError()
×
1440

1441

1442
    def p64(self, *a, **kw):        return self.send(packing.p64(*a, **kw))
1!
1443
    def p32(self, *a, **kw):        return self.send(packing.p32(*a, **kw))
1!
1444
    def p16(self, *a, **kw):        return self.send(packing.p16(*a, **kw))
1!
1445
    def p8(self, *a, **kw):         return self.send(packing.p8(*a, **kw))
1!
1446
    def pack(self, *a, **kw):       return self.send(packing.pack(*a, **kw))
1✔
1447

1448
    def u64(self, *a, **kw):        return packing.u64(self.recvn(8), *a, **kw)
1!
1449
    def u32(self, *a, **kw):        return packing.u32(self.recvn(4), *a, **kw)
1!
1450
    def u16(self, *a, **kw):        return packing.u16(self.recvn(2), *a, **kw)
1!
1451
    def u8(self, *a, **kw):         return packing.u8(self.recvn(1), *a, **kw)
1!
1452
    def unpack(self, *a, **kw):     return packing.unpack(self.recvn(context.bytes), *a, **kw)
1✔
1453

1454
    def flat(self, *a, **kw):       return self.send(packing.flat(*a,**kw))
1!
1455
    def fit(self, *a, **kw):        return self.send(packing.fit(*a, **kw))
1!
1456

1457
    # Dynamic functions
1458

1459
    def make_wrapper(func):
1✔
1460
        def wrapperb(self, *a, **kw):
1✔
1461
            return bytearray(func(self, *a, **kw))
×
1462
        def wrapperS(self, *a, **kw):
1✔
1463
            return packing._decode(func(self, *a, **kw))
1✔
1464
        wrapperb.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a bytearray'.format(func=func)
1✔
1465
        wrapperb.__name__ = func.__name__ + 'b'
1✔
1466
        wrapperS.__doc__ = 'Same as :meth:`{func.__name__}`, but returns a str, ' \
1✔
1467
                           'decoding the result using `context.encoding`. ' \
1468
                           '(note that the binary versions are way faster)'.format(func=func)
1469
        wrapperS.__name__ = func.__name__ + 'S'
1✔
1470
        return wrapperb, wrapperS
1✔
1471

1472
    for func in [recv,
1✔
1473
                 recvn,
1474
                 recvall,
1475
                 recvrepeat,
1476
                 recvuntil,
1477
                 recvpred,
1478
                 recvregex,
1479
                 recvline,
1480
                 recvline_contains,
1481
                 recvline_startswith,
1482
                 recvline_endswith,
1483
                 recvline_regex]:
1484
        for wrapper in make_wrapper(func):
1✔
1485
            locals()[wrapper.__name__] = wrapper
1✔
1486

1487
    def make_wrapper(func, alias):
1✔
1488
        def wrapper(self, *a, **kw):
1✔
1489
            return func(self, *a, **kw)
1✔
1490
        wrapper.__doc__ = 'Alias for :meth:`{func.__name__}`'.format(func=func)
1✔
1491
        wrapper.__name__ = alias
1✔
1492
        return wrapper
1✔
1493

1494
    for _name in list(locals()):
1✔
1495
        if 'recv' in _name:
1✔
1496
            _name2 = _name.replace('recv', 'read')
1✔
1497
        elif 'send' in _name:
1!
1498
            _name2 = _name.replace('send', 'write')
1✔
1499
        else:
1500
            continue
×
1501
        locals()[_name2] = make_wrapper(locals()[_name], _name2)
1✔
1502

1503
    # Clean up the scope
1504
    del wrapper, func, make_wrapper, _name, _name2
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc