• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 4c743a92f3eed7759ae5f16bdf9079e7854f0003

pending completion
4c743a92f3eed7759ae5f16bdf9079e7854f0003

push

github-actions

gogo
Fix testing.

3938 of 6504 branches covered (60.55%)

17 of 17 new or added lines in 1 file covered. (100.0%)

12343 of 16873 relevant lines covered (73.15%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.14
/pwnlib/term/term.py
1
from __future__ import absolute_import
1✔
2
from __future__ import division
1✔
3

4
import atexit
1✔
5
import errno
1✔
6
import os
1✔
7
import re
1✔
8
import signal
1✔
9
import six
1✔
10
import struct
1✔
11
import sys
1✔
12
import threading
1✔
13
import traceback
1✔
14

15
if sys.platform != 'win32':
1!
16
    import fcntl
1✔
17
    import termios
1✔
18

19
from pwnlib.context import ContextType
1✔
20
from pwnlib.term import termcap
1✔
21

22
__all__ = ['output', 'init']
1✔
23

24
# we assume no terminal can display more lines than this
25
MAX_TERM_HEIGHT = 200
1✔
26

27
# default values
28
width = 80
1✔
29
height = 25
1✔
30

31
# list of callbacks triggered on SIGWINCH
32
on_winch = []
1✔
33

34

35

36
settings = None
1✔
37
_graphics_mode = False
1✔
38

39
fd = sys.stdout
1✔
40

41
def show_cursor():
1✔
42
    do('cnorm')
1✔
43

44
def hide_cursor():
1✔
45
    do('civis')
1✔
46

47
def update_geometry():
1✔
48
    global width, height
49
    hw = fcntl.ioctl(fd.fileno(), termios.TIOCGWINSZ, '1234')
1✔
50
    h, w = struct.unpack('hh', hw)
1✔
51
    # if the window shrunk and theres still free space at the bottom move
52
    # everything down
53
    if h < height and scroll == 0:
1✔
54
        if cells and cells[-1].end[0] < 0:
1!
55
            delta = min(height - h, 1 - cells[-1].end[0])
×
56
            for cell in cells:
×
57
                cell.end = (cell.end[0] + delta, cell.end[1])
×
58
                cell.start = (cell.start[0] + delta, cell.start[1])
×
59
    height, width = h, w
1✔
60

61
def handler_sigwinch(signum, stack):
1✔
62
    if hasattr(signal, 'pthread_sigmask'):
1!
63
        signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGWINCH})
×
64
    update_geometry()
1✔
65
    redraw()
1✔
66
    for cb in on_winch:
1!
67
        cb()
×
68
    if hasattr(signal, 'pthread_sigmask'):
1!
69
        signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGWINCH})
×
70

71
def handler_sigstop(signum, stack):
1✔
72
    resetterm()
×
73
    os.kill(os.getpid(), signal.SIGSTOP)
×
74

75
def handler_sigcont(signum, stack):
1✔
76
    setupterm()
×
77
    redraw()
×
78

79
def setupterm():
1✔
80
    global settings
81
    update_geometry()
1✔
82
    hide_cursor()
1✔
83
    do('smkx') # keypad mode
1✔
84
    if not settings:
1!
85
        settings = termios.tcgetattr(fd.fileno())
1✔
86
    mode = termios.tcgetattr(fd.fileno())
1✔
87
    IFLAG = 0
1✔
88
    OFLAG = 1
1✔
89
    CFLAG = 2
1✔
90
    LFLAG = 3
1✔
91
    ISPEED = 4
1✔
92
    OSPEED = 5
1✔
93
    CC = 6
1✔
94
    mode[LFLAG] = mode[LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN)
1✔
95
    mode[CC][termios.VMIN] = 1
1✔
96
    mode[CC][termios.VTIME] = 0
1✔
97
    termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
1✔
98

99
def resetterm():
1✔
100
    if settings:
1!
101
        termios.tcsetattr(fd.fileno(), termios.TCSADRAIN, settings)
1✔
102
    show_cursor()
1✔
103
    do('rmkx')
1✔
104
    fd.write(' \x08') # XXX: i don't know why this is needed...
1✔
105
                      #      only necessary when suspending the process
106

107
def init():
1✔
108
    atexit.register(resetterm)
1✔
109
    setupterm()
1✔
110
    signal.signal(signal.SIGWINCH, handler_sigwinch)
1✔
111
    signal.signal(signal.SIGTSTP, handler_sigstop)
1✔
112
    signal.signal(signal.SIGCONT, handler_sigcont)
1✔
113
    # we start with one empty cell at the current cursor position
114
    put('\x1b[6n')
1✔
115
    fd.flush()
1✔
116
    s = ''
1✔
117
    while True:
1✔
118
        try:
1✔
119
            c = os.read(fd.fileno(), 1)
1✔
120
        except OSError as e:
1✔
121
            if e.errno != errno.EINTR:
1!
122
                raise
×
123
            continue
×
124
        if not isinstance(c, six.string_types):
1!
125
            c = c.decode('utf-8')
×
126
        s += c
1✔
127
        if c == 'R':
1✔
128
            break
1✔
129
    row, col = re.findall('\x1b' + r'\[(\d*);(\d*)R', s)[0]
1✔
130
    row = int(row) - height
1✔
131
    col = int(col) - 1
1✔
132
    cell = Cell()
1✔
133
    cell.start = (row, col)
1✔
134
    cell.end = (row, col)
1✔
135
    cell.content = []
1✔
136
    cell.frozen = True
1✔
137
    cell.float = 0
1✔
138
    cell.indent = 0
1✔
139
    cells.append(cell)
1✔
140
    class Wrapper:
1✔
141
        def __init__(self, fd):
1✔
142
            self._fd = fd
1✔
143
        def write(self, s):
1✔
144
            output(s, frozen = True)
1✔
145
        def __getattr__(self, k):
1✔
146
            return getattr(self._fd, k)
1✔
147
    if sys.stdout.isatty():
1!
148
        sys.stdout = Wrapper(sys.stdout)
1✔
149
    if sys.stderr.isatty():
1!
150
        sys.stderr = Wrapper(sys.stderr)
×
151

152
    console = ContextType.defaults['log_console']
1✔
153
    if console.isatty():
1!
154
        ContextType.defaults['log_console'] = Wrapper(console)
1✔
155

156
    # freeze all cells if an exception is thrown
157
    orig_hook = sys.excepthook
1✔
158
    def hook(*args):
1✔
159
        resetterm()
×
160
        for c in cells:
×
161
            c.frozen = True
×
162
            c.float = 0
×
163
        if orig_hook:
×
164
            orig_hook(*args)
×
165
        else:
166
            traceback.print_exception(*args)
×
167
        # this is a bit esoteric
168
        # look here for details: https://stackoverflow.com/questions/12790328/how-to-silence-sys-excepthook-is-missing-error
169
        if fd.fileno() == 2:
×
170
            os.close(fd.fileno())
×
171
    sys.excepthook = hook
1✔
172

173
def put(s):
1✔
174
    if not isinstance(s, six.string_types):
1!
175
        s = s.decode('utf-8')
×
176
    fd.write(s)
1✔
177

178
def flush(): fd.flush()
1✔
179

180
def do(c, *args):
1✔
181
    s = termcap.get(c, *args)
1✔
182
    if s:
1!
183
        put(s)
1✔
184

185
def goto(r, c):
1✔
186
    do('cup', r - scroll + height - 1, c)
1✔
187

188
cells = []
1✔
189
scroll = 0
1✔
190

191
class Cell(object):
1✔
192
    pass
1✔
193

194
class Handle:
1✔
195
    def __init__(self, cell, is_floating):
1✔
196
        self.h = id(cell)
1✔
197
        self.is_floating = is_floating
1✔
198
    def update(self, s):
1✔
199
        update(self.h, s)
1✔
200
    def freeze(self):
1✔
201
        freeze(self.h)
×
202
    def delete(self):
1✔
203
        delete(self.h)
1✔
204

205
STR, CSI, LF, BS, CR, SOH, STX, OOB = range(8)
1✔
206
def parse_csi(buf, offset):
1✔
207
    i = offset
1✔
208
    while i < len(buf):
1!
209
        c = buf[i]
1✔
210
        if c >= 0x40 and c < 0x80:
1✔
211
            break
1✔
212
        i += 1
1✔
213
    if i >= len(buf):
1!
214
        return
×
215
    end = i
1✔
216
    cmd = [c, None, None]
1✔
217
    i = offset
1✔
218
    in_num = False
1✔
219
    args = []
1✔
220
    if buf[i] >= ord('<') and buf[i] <= ord('?'):
1!
221
        cmd[1] = buf[i]
×
222
        i += 1
×
223
    while i < end:
1✔
224
        c = buf[i]
1✔
225
        if   c >= ord('0') and c <= ord('9'):
1!
226
            if not in_num:
1✔
227
                args.append(c - ord('0'))
1✔
228
                in_num = True
1✔
229
            else:
230
                args[-1] = args[-1] * 10 + c - ord('0')
1✔
231
        elif c == ord(';'):
×
232
            if not in_num:
×
233
                args.append(None)
×
234
            in_num = False
×
235
            if len(args) > 16:
×
236
                break
×
237
        elif c >= 0x20 and c <= 0x2f:
×
238
            cmd[2] = c
×
239
            break
×
240
        i += 1
1✔
241
    return cmd, args, end + 1
1✔
242

243
def parse_utf8(buf, offset):
1✔
244
    c0 = buf[offset]
×
245
    n = 0
×
246
    if   c0 & 0b11100000 == 0b11000000:
×
247
        n = 2
×
248
    elif c0 & 0b11110000 == 0b11100000:
×
249
        n = 3
×
250
    elif c0 & 0b11111000 == 0b11110000:
×
251
        n = 4
×
252
    elif c0 & 0b11111100 == 0b11111000:
×
253
        n = 5
×
254
    elif c0 & 0b11111110 == 0b11111100:
×
255
        n = 6
×
256
    if n:
×
257
        return offset + n
×
258

259
def parse(s):
1✔
260
    global _graphics_mode
261
    if isinstance(s, six.text_type):
1!
262
        s = s.encode('utf8')
×
263
    out = []
1✔
264
    buf = bytearray(s)
1✔
265
    i = 0
1✔
266
    while i < len(buf):
1✔
267
        x = None
1✔
268
        c = buf[i]
1✔
269
        if c >= 0x20 and c <= 0x7e:
1✔
270
            x = (STR, [six.int2byte(c)])
1✔
271
            i += 1
1✔
272
        elif c & 0xc0:
1!
273
            j = parse_utf8(buf, i)
×
274
            if j:
×
275
                x = (STR, [b''.join(map(six.int2byte, buf[i : j]))])
×
276
                i = j
×
277
        elif c == 0x1b and len(buf) > i + 1:
1✔
278
            c1 = buf[i + 1]
1✔
279
            if   c1 == ord('['):
1!
280
                ret = parse_csi(buf, i + 2)
1✔
281
                if ret:
1!
282
                    cmd, args, j = ret
1✔
283
                    x = (CSI, (cmd, args, b''.join(map(six.int2byte, buf[i : j]))))
1✔
284
                    i = j
1✔
285
            elif c1 == ord(']'):
×
286
                # XXX: this is a dirty hack:
287
                #  we still need to do our homework on this one, but what we do
288
                #  here is supporting setting the terminal title and updating
289
                #  the color map.  we promise to do it properly in the next
290
                #  iteration of this terminal emulation/compatibility layer
291
                #  related: https://unix.stackexchange.com/questions/5936/can-i-set-my-local-machines-terminal-colors-to-use-those-of-the-machine-i-ssh-i
292
                try:
×
293
                    j = s.index('\x07', i)
×
294
                except Exception:
×
295
                    try:
×
296
                        j = s.index('\x1b\\', i)
×
297
                    except Exception:
×
298
                        j = 1
×
299
                x = (OOB, s[i:j + 1])
×
300
                i = j + 1
×
301
            elif c1 in map(ord, '()'): # select G0 or G1
×
302
                i += 3
×
303
                continue
×
304
            elif c1 in map(ord, '>='): # set numeric/application keypad mode
×
305
                i += 2
×
306
                continue
×
307
            elif c1 == ord('P'):
×
308
                _graphics_mode = True
×
309
                i += 2
×
310
                continue
×
311
            elif c1 == ord('\\'):
×
312
                _graphics_mode = False
×
313
                i += 2
×
314
                continue
×
315
        elif c == 0x01:
1!
316
            x = (SOH, None)
×
317
            i += 1
×
318
        elif c == 0x02:
1!
319
            x = (STX, None)
×
320
            i += 1
×
321
        elif c == 0x08:
1!
322
            x = (BS, None)
×
323
            i += 1
×
324
        elif c == 0x09:
1!
325
            x = (STR, [b'    ']) # who the **** uses tabs anyway?
×
326
            i += 1
×
327
        elif c == 0x0a:
1!
328
            x = (LF, None)
1✔
329
            i += 1
1✔
330
        elif c == 0x0d:
×
331
            x = (CR, None)
×
332
            i += 1
×
333

334
        if x is None:
1!
335
            x = (STR, [six.int2byte(c) for c in bytearray(b'\\x%02x' % c)])
×
336
            i += 1
×
337

338
        if _graphics_mode:
1!
339
            continue
×
340

341
        if x[0] == STR and out and out[-1][0] == STR:
1✔
342
            out[-1][1].extend(x[1])
1✔
343
        else:
344
            out.append(x)
1✔
345
    return out
1✔
346

347
saved_cursor = None
1✔
348
# XXX: render cells that is half-way on the screen
349
def render_cell(cell, clear_after = False):
1✔
350
    global scroll, saved_cursor
351
    row, col = cell.start
1✔
352
    row = row - scroll + height - 1
1✔
353
    if row < 0:
1!
354
        return
×
355
    indent = min(cell.indent, width - 1)
1✔
356
    for t, x in cell.content:
1✔
357
        if   t == STR:
1✔
358
            i = 0
1✔
359
            while i < len(x):
1✔
360
                if col >= width:
1!
361
                    col = 0
×
362
                    row += 1
×
363
                if col < indent:
1!
364
                    put(' ' * (indent - col))
×
365
                    col = indent
×
366
                c = x[i]
1✔
367
                if not hasattr(c, 'encode'):
1!
368
                    c = c.decode('utf-8', 'backslashreplace')
×
369
                put(c)
1✔
370
                col += 1
1✔
371
                i += 1
1✔
372
        elif t == CSI:
1✔
373
            cmd, args, c = x
1✔
374
            put(c)
1✔
375
            # figure out if the cursor moved (XXX: here probably be bugs)
376
            if cmd[1] is None and cmd[2] is None:
1!
377
                c = cmd[0]
1✔
378
                if len(args) >= 1:
1✔
379
                    n = args[0]
1✔
380
                else:
381
                    n = None
1✔
382
                if len(args) >= 2:
1!
383
                    m = args[1]
×
384
                else:
385
                    m = None
1✔
386
                if   c == ord('A'):
1!
387
                    n = n or 1
×
388
                    row = max(0, row - n)
×
389
                elif c == ord('B'):
1!
390
                    n = n or 1
×
391
                    row = min(height - 1, row + n)
×
392
                elif c == ord('C'):
1!
393
                    n = n or 1
×
394
                    col = min(width - 1, col + n)
×
395
                elif c == ord('D'):
1!
396
                    n = n or 1
×
397
                    col = max(0, col - n)
×
398
                elif c == ord('E'):
1!
399
                    n = n or 1
×
400
                    row = min(height - 1, row + n)
×
401
                    col = 0
×
402
                elif c == ord('F'):
1!
403
                    n = n or 1
×
404
                    row = max(0, row - n)
×
405
                    col = 0
×
406
                elif c == ord('G'):
1!
407
                    n = n or 1
×
408
                    col = min(width - 1, n - 1)
×
409
                elif c == ord('H') or c == ord('f'):
1!
410
                    n = n or 1
×
411
                    m = m or 1
×
412
                    row = min(height - 1, n - 1)
×
413
                    col = min(width - 1, m - 1)
×
414
                elif c == ord('S'):
1!
415
                    n = n or 1
×
416
                    scroll += n
×
417
                    row = max(0, row - n)
×
418
                elif c == ord('T'):
1!
419
                    n = n or 1
×
420
                    scroll -= n
×
421
                    row = min(height - 1, row + n)
×
422
                elif c == ord('s'):
1!
423
                    saved_cursor = row, col
×
424
                elif c == ord('u'):
1!
425
                    if saved_cursor:
×
426
                        row, col = saved_cursor
×
427
        elif t == LF:
1!
428
            if clear_after and col <= width - 1:
1!
429
                put('\x1b[K') # clear line
1✔
430
            put('\n')
1✔
431
            col = 0
1✔
432
            row += 1
1✔
433
        elif t == BS:
×
434
            if col > 0:
×
435
                put('\x08')
×
436
                col -= 1
×
437
        elif t == CR:
×
438
            put('\r')
×
439
            col = 0
×
440
        elif t == SOH:
×
441
            put('\x01')
×
442
        elif t == STX:
×
443
            put('\x02')
×
444
        elif t == OOB:
×
445
            put(x)
×
446
        if row >= height:
1✔
447
            d = row - height + 1
1✔
448
            scroll += d
1✔
449
            row -= d
1✔
450
    row = row + scroll - height + 1
1✔
451
    cell.end = (row, col)
1✔
452

453
def render_from(i, force = False, clear_after = False):
1✔
454
    e = None
1✔
455
    # `i` should always be a valid cell, but in case i f***ed up somewhere, I'll
456
    # check it and just do nothing if something went wrong.
457
    if i < 0 or i >= len(cells):
1!
458
        return
×
459
    goto(*cells[i].start)
1✔
460
    for c in cells[i:]:
1✔
461
        if not force and c.start == e:
1✔
462
            goto(*cells[-1].end)
1✔
463
            break
1✔
464
        elif e:
1✔
465
            c.start = e
1✔
466
        render_cell(c, clear_after = clear_after)
1✔
467
        e = c.end
1✔
468
    if clear_after and (e[0] < scroll or e[1] < width - 1):
1!
469
        put('\x1b[J')
1✔
470
    flush()
1✔
471

472
def redraw():
1✔
473
    for i in reversed(range(len(cells))):
1!
474
        row = cells[i].start[0]
×
475
        if row - scroll + height <= 0:
×
476
            # XXX: remove this line when render_cell is fixed
477
            i += 1
×
478
            break
×
479
    else:
480
        if not cells:
1!
481
            return
1✔
482
    render_from(i, force = True, clear_after = True)
×
483

484
lock = threading.Lock()
1✔
485
def output(s = '', float = False, priority = 10, frozen = False,
1✔
486
            indent = 0, before = None, after = None):
487
    with lock:
1✔
488
        rel = before or after
1✔
489
        if rel:
1!
490
            i, _ = find_cell(rel.h)
×
491
            is_floating = rel.is_floating
×
492
            float = cells[i].float
×
493
            if before:
×
494
                i -= 1
×
495
        elif float and priority:
1✔
496
            is_floating = True
1✔
497
            float = priority
1✔
498
            for i in reversed(range(len(cells))):
1!
499
                if cells[i].float <= float:
1!
500
                    break
1✔
501
        else:
502
            is_floating = False
1✔
503
            i = len(cells) - 1
1✔
504
            while i > 0 and cells[i].float:
1✔
505
                i -= 1
1✔
506
        # put('xx %d\n' % i)
507
        cell = Cell()
1✔
508
        cell.content = parse(s)
1✔
509
        cell.frozen = frozen
1✔
510
        cell.float = float
1✔
511
        cell.indent = indent
1✔
512
        cell.start = cells[i].end
1✔
513
        i += 1
1✔
514
        cells.insert(i, cell)
1✔
515
        h = Handle(cell, is_floating)
1✔
516
        if not s:
1✔
517
            cell.end = cell.start
1✔
518
            return h
1✔
519
        # the invariant is that the cursor is placed after the last cell
520
        if i == len(cells) - 1:
1✔
521
            render_cell(cell, clear_after = True)
1✔
522
            flush()
1✔
523
        else:
524
            render_from(i, clear_after = True)
1✔
525
        return h
1✔
526

527
def find_cell(h):
1✔
528
    for i, c in enumerate(cells):
1!
529
        if id(c) == h:
1✔
530
            return i, c
1✔
531
    raise KeyError
×
532

533
def discard_frozen():
1✔
534
    # we assume that no cell will shrink very much and that noone has space
535
    # for more than MAX_TERM_HEIGHT lines in their terminal
536
    while len(cells) > 1 and scroll - cells[0].end[0] > MAX_TERM_HEIGHT:
1!
537
        c = cells.pop(0)
×
538
        del c # trigger GC maybe, kthxbai
×
539

540
def update(h, s):
1✔
541
    with lock:
1✔
542
        try:
1✔
543
            i, c = find_cell(h)
1✔
544
        except KeyError:
×
545
            return
×
546
        if not c.frozen and c.content != s:
1!
547
            c.content = parse(s)
1✔
548
            render_from(i, clear_after = True)
1✔
549

550
def freeze(h):
1✔
551
    try:
1✔
552
        i, c = find_cell(h)
1✔
553
        c.frozen = True
1✔
554
        c.float = 0
1✔
555
        if c.content == []:
1!
556
            cells.pop(i)
1✔
557
        discard_frozen()
1✔
558
    except KeyError:
×
559
        return
×
560

561
def delete(h):
1✔
562
    update(h, '')
1✔
563
    freeze(h)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc