• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 5f1e8a43414155cfd080df00632ea5c9a0b736d1-PR-1763

pending completion
5f1e8a43414155cfd080df00632ea5c9a0b736d1-PR-1763

Pull #1763

github-actions

web-flow
Merge 8d7613a41 into b73e74f1f
Pull Request #1763: Add env_add to the process module

3640 of 6422 branches covered (56.68%)

2 of 2 new or added lines in 1 file covered. (100.0%)

11697 of 16708 relevant lines covered (70.01%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.03
/pwnlib/term/term.py
1
from __future__ import absolute_import
1✔
2
from __future__ import division
1✔
3

4
import atexit
1✔
5
import errno
1✔
6
import os
1✔
7
import re
1✔
8
import signal
1✔
9
import six
1✔
10
import struct
1✔
11
import sys
1✔
12
import threading
1✔
13
import traceback
1✔
14

15
if sys.platform != 'win32':
1!
16
    import fcntl
1✔
17
    import termios
1✔
18

19
from pwnlib.context import ContextType
1✔
20
from pwnlib.term import termcap
1✔
21

22
__all__ = ['output', 'init']
1✔
23

24
# we assume no terminal can display more lines than this
25
MAX_TERM_HEIGHT = 200
1✔
26

27
# default values
28
width = 80
1✔
29
height = 25
1✔
30

31
# list of callbacks triggered on SIGWINCH
32
on_winch = []
1✔
33

34

35

36
settings = None
1✔
37
_graphics_mode = False
1✔
38

39
fd = sys.stdout
1✔
40

41
def show_cursor():
1✔
42
    do('cnorm')
×
43

44
def hide_cursor():
1✔
45
    do('civis')
×
46

47
def update_geometry():
1✔
48
    global width, height
49
    hw = fcntl.ioctl(fd.fileno(), termios.TIOCGWINSZ, '1234')
×
50
    h, w = struct.unpack('hh', hw)
×
51
    # if the window shrunk and theres still free space at the bottom move
52
    # everything down
53
    if h < height and scroll == 0:
×
54
        if cells and cells[-1].end[0] < 0:
×
55
            delta = min(height - h, 1 - cells[-1].end[0])
×
56
            for cell in cells:
×
57
                cell.end = (cell.end[0] + delta, cell.end[1])
×
58
                cell.start = (cell.start[0] + delta, cell.start[1])
×
59
    height, width = h, w
×
60

61
def handler_sigwinch(signum, stack):
1✔
62
    if hasattr(signal, 'pthread_sigmask'):
×
63
        signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGWINCH})
×
64
    update_geometry()
×
65
    redraw()
×
66
    for cb in on_winch:
×
67
        cb()
×
68
    if hasattr(signal, 'pthread_sigmask'):
×
69
        signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGWINCH})
×
70

71
def handler_sigstop(signum, stack):
1✔
72
    resetterm()
×
73
    os.kill(os.getpid(), signal.SIGSTOP)
×
74

75
def handler_sigcont(signum, stack):
1✔
76
    setupterm()
×
77
    redraw()
×
78

79
def setupterm():
1✔
80
    global settings
81
    update_geometry()
×
82
    hide_cursor()
×
83
    do('smkx') # keypad mode
×
84
    if not settings:
×
85
        settings = termios.tcgetattr(fd.fileno())
×
86
    mode = termios.tcgetattr(fd.fileno())
×
87
    IFLAG = 0
×
88
    OFLAG = 1
×
89
    CFLAG = 2
×
90
    LFLAG = 3
×
91
    ISPEED = 4
×
92
    OSPEED = 5
×
93
    CC = 6
×
94
    mode[LFLAG] = mode[LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN)
×
95
    mode[CC][termios.VMIN] = 1
×
96
    mode[CC][termios.VTIME] = 0
×
97
    termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
×
98

99
def resetterm():
1✔
100
    if settings:
×
101
        termios.tcsetattr(fd.fileno(), termios.TCSADRAIN, settings)
×
102
    show_cursor()
×
103
    do('rmkx')
×
104
    fd.write(' \x08') # XXX: i don't know why this is needed...
×
105
                      #      only necessary when suspending the process
106

107
def init():
1✔
108
    atexit.register(resetterm)
×
109
    setupterm()
×
110
    signal.signal(signal.SIGWINCH, handler_sigwinch)
×
111
    signal.signal(signal.SIGTSTP, handler_sigstop)
×
112
    signal.signal(signal.SIGCONT, handler_sigcont)
×
113
    # we start with one empty cell at the current cursor position
114
    put('\x1b[6n')
×
115
    fd.flush()
×
116
    s = ''
×
117
    while True:
×
118
        try:
×
119
            c = os.read(fd.fileno(), 1)
×
120
        except OSError as e:
×
121
            if e.errno != errno.EINTR:
×
122
                raise
×
123
            continue
×
124
        if not isinstance(c, six.string_types):
×
125
            c = c.decode('utf-8')
×
126
        s += c
×
127
        if c == 'R':
×
128
            break
×
129
    row, col = re.findall('\x1b' + r'\[(\d*);(\d*)R', s)[0]
×
130
    row = int(row) - height
×
131
    col = int(col) - 1
×
132
    cell = Cell()
×
133
    cell.start = (row, col)
×
134
    cell.end = (row, col)
×
135
    cell.content = []
×
136
    cell.frozen = True
×
137
    cell.float = 0
×
138
    cell.indent = 0
×
139
    cells.append(cell)
×
140
    class Wrapper:
×
141
        def __init__(self, fd):
×
142
            self._fd = fd
×
143
        def write(self, s):
×
144
            output(s, frozen = True)
×
145
        def __getattr__(self, k):
×
146
            return getattr(self._fd, k)
×
147
    if sys.stdout.isatty():
×
148
        sys.stdout = Wrapper(sys.stdout)
×
149
    if sys.stderr.isatty():
×
150
        sys.stderr = Wrapper(sys.stderr)
×
151

152
    console = ContextType.defaults['log_console']
×
153
    if console.isatty():
×
154
        ContextType.defaults['log_console'] = Wrapper(console)
×
155

156
    # freeze all cells if an exception is thrown
157
    orig_hook = sys.excepthook
×
158
    def hook(*args):
×
159
        resetterm()
×
160
        for c in cells:
×
161
            c.frozen = True
×
162
            c.float = 0
×
163
        if orig_hook:
×
164
            orig_hook(*args)
×
165
        else:
166
            traceback.print_exception(*args)
×
167
        # this is a bit esoteric
168
        # look here for details: https://stackoverflow.com/questions/12790328/how-to-silence-sys-excepthook-is-missing-error
169
        if fd.fileno() == 2:
×
170
            os.close(fd.fileno())
×
171
    sys.excepthook = hook
×
172

173
def put(s):
1✔
174
    if not isinstance(s, six.string_types):
×
175
        s = s.decode('utf-8')
×
176
    fd.write(s)
×
177

178
def flush(): fd.flush()
1!
179

180
def do(c, *args):
1✔
181
    s = termcap.get(c, *args)
×
182
    if s:
×
183
        put(s)
×
184

185
def goto(r, c):
1✔
186
    do('cup', r - scroll + height - 1, c)
×
187

188
cells = []
1✔
189
scroll = 0
1✔
190

191
class Cell(object):
1✔
192
    pass
1✔
193

194
class Handle:
1✔
195
    def __init__(self, cell, is_floating):
1✔
196
        self.h = id(cell)
×
197
        self.is_floating = is_floating
×
198
    def update(self, s):
1✔
199
        update(self.h, s)
×
200
    def freeze(self):
1✔
201
        freeze(self.h)
×
202
    def delete(self):
1✔
203
        delete(self.h)
×
204

205
STR, CSI, LF, BS, CR, SOH, STX, OOB = range(8)
1✔
206
def parse_csi(buf, offset):
1✔
207
    i = offset
×
208
    while i < len(buf):
×
209
        c = buf[i]
×
210
        if c >= 0x40 and c < 0x80:
×
211
            break
×
212
        i += 1
×
213
    if i >= len(buf):
×
214
        return
×
215
    end = i
×
216
    cmd = [c, None, None]
×
217
    i = offset
×
218
    in_num = False
×
219
    args = []
×
220
    if buf[i] >= ord('<') and buf[i] <= ord('?'):
×
221
        cmd[1] = buf[i]
×
222
        i += 1
×
223
    while i < end:
×
224
        c = buf[i]
×
225
        if   c >= ord('0') and c <= ord('9'):
×
226
            if not in_num:
×
227
                args.append(c - ord('0'))
×
228
                in_num = True
×
229
            else:
230
                args[-1] = args[-1] * 10 + c - ord('0')
×
231
        elif c == ord(';'):
×
232
            if not in_num:
×
233
                args.append(None)
×
234
            in_num = False
×
235
            if len(args) > 16:
×
236
                break
×
237
        elif c >= 0x20 and c <= 0x2f:
×
238
            cmd[2] = c
×
239
            break
×
240
        i += 1
×
241
    return cmd, args, end + 1
×
242

243
def parse_utf8(buf, offset):
1✔
244
    c0 = buf[offset]
×
245
    n = 0
×
246
    if   c0 & 0b11100000 == 0b11000000:
×
247
        n = 2
×
248
    elif c0 & 0b11110000 == 0b11100000:
×
249
        n = 3
×
250
    elif c0 & 0b11111000 == 0b11110000:
×
251
        n = 4
×
252
    elif c0 & 0b11111100 == 0b11111000:
×
253
        n = 5
×
254
    elif c0 & 0b11111110 == 0b11111100:
×
255
        n = 6
×
256
    if n:
×
257
        return offset + n
×
258

259
def parse(s):
1✔
260
    global _graphics_mode
261
    if isinstance(s, six.text_type):
×
262
        s = s.encode('utf8')
×
263
    out = []
×
264
    buf = bytearray(s)
×
265
    i = 0
×
266
    while i < len(buf):
×
267
        x = None
×
268
        c = buf[i]
×
269
        if c >= 0x20 and c <= 0x7e:
×
270
            x = (STR, [six.int2byte(c)])
×
271
            i += 1
×
272
        elif c & 0xc0:
×
273
            j = parse_utf8(buf, i)
×
274
            if j:
×
275
                x = (STR, [b''.join(map(six.int2byte, buf[i : j]))])
×
276
                i = j
×
277
        elif c == 0x1b and len(buf) > i + 1:
×
278
            c1 = buf[i + 1]
×
279
            if   c1 == ord('['):
×
280
                ret = parse_csi(buf, i + 2)
×
281
                if ret:
×
282
                    cmd, args, j = ret
×
283
                    x = (CSI, (cmd, args, b''.join(map(six.int2byte, buf[i : j]))))
×
284
                    i = j
×
285
            elif c1 == ord(']'):
×
286
                # XXX: this is a dirty hack:
287
                #  we still need to do our homework on this one, but what we do
288
                #  here is supporting setting the terminal title and updating
289
                #  the color map.  we promise to do it properly in the next
290
                #  iteration of this terminal emulation/compatibility layer
291
                #  related: https://unix.stackexchange.com/questions/5936/can-i-set-my-local-machines-terminal-colors-to-use-those-of-the-machine-i-ssh-i
292
                try:
×
293
                    j = s.index('\x07', i)
×
294
                except Exception:
×
295
                    try:
×
296
                        j = s.index('\x1b\\', i)
×
297
                    except Exception:
×
298
                        j = 1
×
299
                x = (OOB, s[i:j + 1])
×
300
                i = j + 1
×
301
            elif c1 in map(ord, '()'): # select G0 or G1
×
302
                i += 3
×
303
                continue
×
304
            elif c1 in map(ord, '>='): # set numeric/application keypad mode
×
305
                i += 2
×
306
                continue
×
307
            elif c1 == ord('P'):
×
308
                _graphics_mode = True
×
309
                i += 2
×
310
                continue
×
311
            elif c1 == ord('\\'):
×
312
                _graphics_mode = False
×
313
                i += 2
×
314
                continue
×
315
        elif c == 0x01:
×
316
            x = (SOH, None)
×
317
            i += 1
×
318
        elif c == 0x02:
×
319
            x = (STX, None)
×
320
            i += 1
×
321
        elif c == 0x08:
×
322
            x = (BS, None)
×
323
            i += 1
×
324
        elif c == 0x09:
×
325
            x = (STR, [b'    ']) # who the **** uses tabs anyway?
×
326
            i += 1
×
327
        elif c == 0x0a:
×
328
            x = (LF, None)
×
329
            i += 1
×
330
        elif c == 0x0d:
×
331
            x = (CR, None)
×
332
            i += 1
×
333

334
        if x is None:
×
335
            x = (STR, [six.int2byte(c) for c in bytearray(b'\\x%02x' % c)])
×
336
            i += 1
×
337

338
        if _graphics_mode:
×
339
            continue
×
340

341
        if x[0] == STR and out and out[-1][0] == STR:
×
342
            out[-1][1].extend(x[1])
×
343
        else:
344
            out.append(x)
×
345
    return out
×
346

347
saved_cursor = None
1✔
348
# XXX: render cells that is half-way on the screen
349
def render_cell(cell, clear_after = False):
1✔
350
    global scroll, saved_cursor
351
    row, col = cell.start
×
352
    row = row - scroll + height - 1
×
353
    if row < 0:
×
354
        return
×
355
    indent = min(cell.indent, width - 1)
×
356
    for t, x in cell.content:
×
357
        if   t == STR:
×
358
            i = 0
×
359
            while i < len(x):
×
360
                if col >= width:
×
361
                    col = 0
×
362
                    row += 1
×
363
                if col < indent:
×
364
                    put(' ' * (indent - col))
×
365
                    col = indent
×
366
                c = x[i]
×
367
                if not hasattr(c, 'encode'):
×
368
                    c = c.decode('utf-8', 'backslashreplace')
×
369
                put(c)
×
370
                col += 1
×
371
                i += 1
×
372
        elif t == CSI:
×
373
            cmd, args, c = x
×
374
            put(c)
×
375
            # figure out if the cursor moved (XXX: here probably be bugs)
376
            if cmd[1] is None and cmd[2] is None:
×
377
                c = cmd[0]
×
378
                if len(args) >= 1:
×
379
                    n = args[0]
×
380
                else:
381
                    n = None
×
382
                if len(args) >= 2:
×
383
                    m = args[1]
×
384
                else:
385
                    m = None
×
386
                if   c == ord('A'):
×
387
                    n = n or 1
×
388
                    row = max(0, row - n)
×
389
                elif c == ord('B'):
×
390
                    n = n or 1
×
391
                    row = min(height - 1, row + n)
×
392
                elif c == ord('C'):
×
393
                    n = n or 1
×
394
                    col = min(width - 1, col + n)
×
395
                elif c == ord('D'):
×
396
                    n = n or 1
×
397
                    col = max(0, col - n)
×
398
                elif c == ord('E'):
×
399
                    n = n or 1
×
400
                    row = min(height - 1, row + n)
×
401
                    col = 0
×
402
                elif c == ord('F'):
×
403
                    n = n or 1
×
404
                    row = max(0, row - n)
×
405
                    col = 0
×
406
                elif c == ord('G'):
×
407
                    n = n or 1
×
408
                    col = min(width - 1, n - 1)
×
409
                elif c == ord('H') or c == ord('f'):
×
410
                    n = n or 1
×
411
                    m = m or 1
×
412
                    row = min(height - 1, n - 1)
×
413
                    col = min(width - 1, m - 1)
×
414
                elif c == ord('S'):
×
415
                    n = n or 1
×
416
                    scroll += n
×
417
                    row = max(0, row - n)
×
418
                elif c == ord('T'):
×
419
                    n = n or 1
×
420
                    scroll -= n
×
421
                    row = min(height - 1, row + n)
×
422
                elif c == ord('s'):
×
423
                    saved_cursor = row, col
×
424
                elif c == ord('u'):
×
425
                    if saved_cursor:
×
426
                        row, col = saved_cursor
×
427
        elif t == LF:
×
428
            if clear_after and col <= width - 1:
×
429
                put('\x1b[K') # clear line
×
430
            put('\n')
×
431
            col = 0
×
432
            row += 1
×
433
        elif t == BS:
×
434
            if col > 0:
×
435
                put('\x08')
×
436
                col -= 1
×
437
        elif t == CR:
×
438
            put('\r')
×
439
            col = 0
×
440
        elif t == SOH:
×
441
            put('\x01')
×
442
        elif t == STX:
×
443
            put('\x02')
×
444
        elif t == OOB:
×
445
            put(x)
×
446
        if row >= height:
×
447
            d = row - height + 1
×
448
            scroll += d
×
449
            row -= d
×
450
    row = row + scroll - height + 1
×
451
    cell.end = (row, col)
×
452

453
def render_from(i, force = False, clear_after = False):
1✔
454
    e = None
×
455
    # `i` should always be a valid cell, but in case i f***ed up somewhere, I'll
456
    # check it and just do nothing if something went wrong.
457
    if i < 0 or i >= len(cells):
×
458
        return
×
459
    goto(*cells[i].start)
×
460
    for c in cells[i:]:
×
461
        if not force and c.start == e:
×
462
            goto(*cells[-1].end)
×
463
            break
×
464
        elif e:
×
465
            c.start = e
×
466
        render_cell(c, clear_after = clear_after)
×
467
        e = c.end
×
468
    if clear_after and (e[0] < scroll or e[1] < width - 1):
×
469
        put('\x1b[J')
×
470
    flush()
×
471

472
def redraw():
1✔
473
    for i in reversed(range(len(cells))):
×
474
        row = cells[i].start[0]
×
475
        if row - scroll + height <= 0:
×
476
            # XXX: remove this line when render_cell is fixed
477
            i += 1
×
478
            break
×
479
    else:
480
        if not cells:
×
481
            return
×
482
    render_from(i, force = True, clear_after = True)
×
483

484
lock = threading.Lock()
1✔
485
def output(s = '', float = False, priority = 10, frozen = False,
1✔
486
            indent = 0, before = None, after = None):
487
    with lock:
×
488
        rel = before or after
×
489
        if rel:
×
490
            i, _ = find_cell(rel.h)
×
491
            is_floating = rel.is_floating
×
492
            float = cells[i].float
×
493
            if before:
×
494
                i -= 1
×
495
        elif float and priority:
×
496
            is_floating = True
×
497
            float = priority
×
498
            for i in reversed(range(len(cells))):
×
499
                if cells[i].float <= float:
×
500
                    break
×
501
        else:
502
            is_floating = False
×
503
            i = len(cells) - 1
×
504
            while i > 0 and cells[i].float:
×
505
                i -= 1
×
506
        # put('xx %d\n' % i)
507
        cell = Cell()
×
508
        cell.content = parse(s)
×
509
        cell.frozen = frozen
×
510
        cell.float = float
×
511
        cell.indent = indent
×
512
        cell.start = cells[i].end
×
513
        i += 1
×
514
        cells.insert(i, cell)
×
515
        h = Handle(cell, is_floating)
×
516
        if not s:
×
517
            cell.end = cell.start
×
518
            return h
×
519
        # the invariant is that the cursor is placed after the last cell
520
        if i == len(cells) - 1:
×
521
            render_cell(cell, clear_after = True)
×
522
            flush()
×
523
        else:
524
            render_from(i, clear_after = True)
×
525
        return h
×
526

527
def find_cell(h):
1✔
528
    for i, c in enumerate(cells):
×
529
        if id(c) == h:
×
530
            return i, c
×
531
    raise KeyError
×
532

533
def discard_frozen():
1✔
534
    # we assume that no cell will shrink very much and that noone has space
535
    # for more than MAX_TERM_HEIGHT lines in their terminal
536
    while len(cells) > 1 and scroll - cells[0].end[0] > MAX_TERM_HEIGHT:
×
537
        c = cells.pop(0)
×
538
        del c # trigger GC maybe, kthxbai
×
539

540
def update(h, s):
1✔
541
    with lock:
×
542
        try:
×
543
            i, c = find_cell(h)
×
544
        except KeyError:
×
545
            return
×
546
        if not c.frozen and c.content != s:
×
547
            c.content = parse(s)
×
548
            render_from(i, clear_after = True)
×
549

550
def freeze(h):
1✔
551
    try:
×
552
        i, c = find_cell(h)
×
553
        c.frozen = True
×
554
        c.float = 0
×
555
        if c.content == []:
×
556
            cells.pop(i)
×
557
        discard_frozen()
×
558
    except KeyError:
×
559
        return
×
560

561
def delete(h):
1✔
562
    update(h, '')
×
563
    freeze(h)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc