• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 7171266103

11 Dec 2023 05:41PM UTC coverage: 74.638% (+0.1%) from 74.499%
7171266103

push

github

Arusekk
term: hotfix

4564 of 7217 branches covered (0.0%)

12834 of 17195 relevant lines covered (74.64%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.15
/pwnlib/term/term.py
1
from __future__ import absolute_import
1✔
2
from __future__ import division
1✔
3
from __future__ import unicode_literals
1✔
4

5
import atexit
1✔
6
import errno
1✔
7
import os
1✔
8
import re
1✔
9
import shutil
1✔
10
import signal
1✔
11
import struct
1✔
12
import sys
1✔
13
import threading
1✔
14
import traceback
1✔
15
import weakref
1✔
16

17
if sys.platform != 'win32':
1!
18
    import fcntl
1✔
19
    import termios
1✔
20

21
from ..context import ContextType
1✔
22
from . import termcap
1✔
23
from .. import py2compat
1✔
24

25
__all__ = ['output', 'init']
1✔
26

27
# we assume no terminal can display more lines than this
28
MAX_TERM_HEIGHT = 200
1✔
29

30
# list of callbacks triggered on SIGWINCH
31
on_winch = []
1✔
32

33
cached_pos = None
1✔
34
settings = None
1✔
35
setup_done = False
1✔
36
epoch = 0
1✔
37

38
fd = sys.stdout
1✔
39
winchretry = False
1✔
40
rlock = threading.RLock()
1✔
41

42
class WinchLock(object):
1✔
43
    def __init__(self):
1✔
44
        self.guard = threading.RLock()
1✔
45
        self.lock = threading.Lock()
1✔
46

47
    @property
1✔
48
    def acquire(self):
1✔
49
        return self.lock.acquire
×
50

51
    @property
1✔
52
    def release(self):
1✔
53
        return self.lock.release
×
54

55
    def __enter__(self):
1✔
56
        self.guard.acquire()
1✔
57
        return self.lock.__enter__()
1✔
58
    def __exit__(self, tp, val, tb):
1✔
59
        try:
1✔
60
            return self.lock.__exit__(tp, val, tb)
1✔
61
        finally:
62
            if winchretry:
1!
63
                handler_sigwinch(signal.SIGWINCH, None)
×
64
            self.guard.release()
1✔
65

66
wlock = WinchLock()
1✔
67

68
def show_cursor():
1✔
69
    do('cnorm')
1✔
70

71
def hide_cursor():
1✔
72
    do('civis')
1✔
73

74
def update_geometry():
1✔
75
    global width, height
76
    width, height = shutil.get_terminal_size()
1✔
77

78
def handler_sigwinch(signum, stack):
1✔
79
    global cached_pos, winchretry
80
    with wlock.guard:
×
81
        while True:
×
82
            if not wlock.acquire(False):
×
83
                winchretry = True
×
84
                return
×
85

86
            winchretry = False
×
87
            update_geometry()
×
88
            for cb in on_winch:
×
89
                cb()
×
90
            wlock.release()
×
91
            if not winchretry: break
×
92

93

94
def handler_sigstop(signum, stack):
1✔
95
    resetterm()
×
96
    os.kill(0, signal.SIGSTOP)
×
97

98
def handler_sigcont(signum, stack):
1✔
99
    global epoch, cached_pos, setup_done
100
    epoch += 1
×
101
    cached_pos = None
×
102
    setup_done = False
×
103

104
def setupterm():
1✔
105
    global settings, setup_done
106
    if setup_done:
1✔
107
        return
1✔
108
    setup_done = True
1✔
109
    hide_cursor()
1✔
110
    update_geometry()
1✔
111
    do('smkx') # keypad mode
1✔
112
    mode = termios.tcgetattr(fd)
1✔
113
    IFLAG, OFLAG, CFLAG, LFLAG, ISPEED, OSPEED, CC = range(7)
1✔
114
    if not settings:
1!
115
        settings = mode[:]
1✔
116
        settings[CC] = settings[CC][:]
1✔
117
    mode[LFLAG] &= ~(termios.ECHO | termios.ICANON | termios.IEXTEN)
1✔
118
    mode[CC][termios.VMIN] = 1
1✔
119
    mode[CC][termios.VTIME] = 0
1✔
120
    termios.tcsetattr(fd, termios.TCSADRAIN, mode)
1✔
121
    fd.flush()
1✔
122

123
def resetterm():
1✔
124
    global settings, setup_done
125
    if settings:
1!
126
        termios.tcsetattr(fd, termios.TCSADRAIN, settings)
1✔
127
        settings = None
1✔
128
    if setup_done:
1!
129
        setup_done = False
1✔
130
        show_cursor()
1✔
131
        do('rmkx')
1✔
132
        fd.flush()
1✔
133

134
def init():
1✔
135
    atexit.register(resetterm)
1✔
136
    signal.signal(signal.SIGWINCH, handler_sigwinch)
1✔
137
    signal.signal(signal.SIGTSTP, handler_sigstop)
1✔
138
    signal.signal(signal.SIGCONT, handler_sigcont)
1✔
139
    class Wrapper:
1✔
140
        def __init__(self, fd):
1✔
141
            self._fd = fd
1✔
142
        def write(self, s):
1✔
143
            return output(s, frozen=True)
1✔
144
        def __getattr__(self, k):
1✔
145
            return getattr(self._fd, k)
1✔
146
    if sys.stdout.isatty():
1!
147
        sys.stdout = Wrapper(sys.stdout)
1✔
148
    if sys.stderr.isatty():
1!
149
        sys.stderr = Wrapper(sys.stderr)
×
150

151
    console = ContextType.defaults['log_console']
1✔
152
    if console.isatty():
1!
153
        ContextType.defaults['log_console'] = Wrapper(console)
1✔
154

155
    # freeze all cells if an exception is thrown
156
    orig_hook = sys.excepthook
1✔
157
    def hook(*args):
1✔
158
        sys.stderr = sys.__stderr__
×
159
        resetterm()
×
160
        cells.clear()
×
161
        if orig_hook:
×
162
            orig_hook(*args)
×
163
        else:
164
            traceback.print_exception(*args)
×
165
    sys.excepthook = hook
1✔
166

167
tmap = {c: '\\x{:02x}'.format(c) for c in set(range(0x20)) - {0x09, 0x0a, 0x0d, 0x1b} | {0x7f}}
1✔
168

169
def put(s):
1✔
170
    global cached_pos, epoch
171
    s = s.translate(tmap)
1✔
172
    if cached_pos:
1✔
173
        it = iter(s.replace('\n', '\r\n'))
1✔
174
        sanit_s = ''
1✔
175
        for c in it:
1✔
176
            if c == '\r':
1✔
177
                cached_pos[1] = 0
1✔
178
            elif c == '\n':
1✔
179
                cached_pos[0] += 1
1✔
180
            elif c == '\t':
1!
181
                cached_pos[1] = (cached_pos[1] + 8) & -8
×
182
            elif c in '\x1b\u009b':  # ESC or CSI
1✔
183
                seq = c
1✔
184
                for c in it:
1!
185
                    seq += c
1✔
186
                    if c not in '[0123456789;':
1✔
187
                        break
1✔
188
                else:
189
                    # unterminated ctrl seq, just print it visually
190
                    c = seq.replace('\x1b', r'\x1b').replace('\u009b', r'\u009b')
×
191
                    cached_pos[1] += len(c)
×
192

193
                # if '\e[123;123;123;123m' then nothing
194
                if c == 'm':
1!
195
                    c = seq
1✔
196
                else:
197
                    # undefined ctrl seq, just print it visually
198
                    c = seq.replace('\x1b', r'\x1b').replace('\u009b', r'\u009b')
×
199
                    cached_pos[1] += len(c)
×
200
            elif c < ' ':
1!
201
                assert False, 'impossible ctrl char'
×
202
            else:
203
                # normal character, nothing to see here
204
                cached_pos[1] += 1
1✔
205
            sanit_s += c
1✔
206
        else:
207
            s = sanit_s.replace('\r\n', '\n')
1✔
208
    return fd.write(s)
1✔
209

210
def do(c, *args):
1✔
211
    s = termcap.get(c, *args)
1✔
212
    if s:
1!
213
        fd.write(s.decode('utf-8'))
1✔
214

215
def goto(rc):
1✔
216
    global cached_pos
217
    r, c = rc
1✔
218
    nowr, nowc = cached_pos or (None, None)
1✔
219
    cached_pos = [r, c]
1✔
220
    # common cases: we can just go up/down a couple rows
221
    if c == 0:
1✔
222
        if r == nowr + 1:
1✔
223
            fd.write('\n')
1✔
224
            return
1✔
225
        if c != nowc:
1✔
226
            fd.write('\r')
1✔
227
    elif c != nowc:
1!
228
        do('hpa', c)
1✔
229

230
    if r == nowr - 1:
1✔
231
        do('cuu1')
1✔
232
    elif r < nowr:
1✔
233
        do('cuu', nowr - r)
1✔
234
    elif r > nowr:
1✔
235
        do('cud', r - nowr)
1✔
236

237

238
class Cell(object):
1✔
239
    def __init__(self, value, float):
1✔
240
        self.value = value
1✔
241
        self.float = float
1✔
242

243
    def draw(self):
1✔
244
        self.pos = get_position()
1✔
245
        self.born = epoch
1✔
246
        put(self.value)
1✔
247
        self.pos_after = get_position()
1✔
248

249
    def update(self, value):
1✔
250
        if isinstance(value, bytes):
1✔
251
            value = value.decode('utf-8', 'backslashreplace')
1✔
252
        with wlock:
1✔
253
            want_erase_line = False
1✔
254
            if '\n' in value:
1!
255
                if len(value) < len(self.value):
×
256
                    want_erase_line = True
×
257
                elif '\n' not in self.value:  # not really supported
×
258
                    for cell in cells.iter_after(self):
×
259
                        if cell.value:
×
260
                            want_erase_line = True
×
261
                        break
×
262
            self.value = value
1✔
263
            self.update_locked(erase_line=want_erase_line)
1✔
264
            fd.flush()
1✔
265

266
    def prepare_redraw(self):
1✔
267
        global epoch
268
        if self.born != epoch:
1!
269
            return None
×
270
        saved = get_position()
1✔
271
        if saved < self.pos or saved == (1, 1):
1!
272
            epoch += 1
×
273
            return None
×
274
        goto(self.pos)
1✔
275
        return saved
1✔
276

277
    def update_locked(self, erase_line=False):
1✔
278
        prev_pos = self.prepare_redraw()
1✔
279
        if prev_pos is None:
1!
280
            for cell in cells:
×
281
                cell.draw()
×
282
            return
×
283
        erased_line = None
1✔
284
        if erase_line:
1!
285
            do('el')
×
286
            erased_line = self.pos[0]
×
287
        put(self.value)
1✔
288
        pos = get_position()
1✔
289
        if pos == self.pos_after:
1!
290
            goto(prev_pos)
1✔
291
            return
1✔
292
        if pos < self.pos_after:
×
293
            do('el')
×
294
            erased_line = self.pos[0]
×
295
        old_after = self.pos_after
×
296
        self.pos_after = pos
×
297

298
        cell = self  # in case there are no more cells
×
299
        for cell in cells.iter_after(self):
×
300
            if old_after != cell.pos:
×
301
                # do not merge gaps
302
                break
×
303
            pos = get_position()
×
304
            if erased_line != pos[0]:
×
305
                if pos[0] < cell.pos[0]:
×
306
                    # the cell moved up, erase its line
307
                    do('el')
×
308
                    erased_line = pos[0]
×
309
                elif cell.pos == pos:
×
310
                    # cell got neither moved nor erased
311
                    break
×
312

313
            if pos[1] < cell.pos[1]:
×
314
                # the cell moved left, it must be same line as self; erase if not yet erased
315
                if not erase_line and erased_line != pos[0]:
×
316
                    do('el')
×
317
                    erased_line = pos[0]
×
318

319
            old_after = cell.pos_after
×
320
            cell.draw()
×
321
            if cell.pos_after == old_after and erased_line != old_after[0]:
×
322
                break
×
323
        else:
324
            if cell.float:
×
325
                # erase all screen after last float
326
                do('ed')
×
327
        if prev_pos > get_position():
×
328
            goto(prev_pos)
×
329

330
    def __repr__(self):
1✔
331
        return '{}({!r}, float={}, pos={})'.format(self.__class__.__name__, self.value, self.float, self.pos)
×
332

333

334
class WeakCellList(object):
1✔
335
    def __init__(self):
1✔
336
        self._cells = []
1✔
337
        self._floats = []
1✔
338
        self._lists = self._cells, self._floats
1✔
339

340
    @property
1✔
341
    def cells(self):
1✔
342
        return self.iter_field(self._cells)
×
343

344
    @property
1✔
345
    def floats(self):
1✔
346
        return self.iter_field(self._floats)
1✔
347

348
    def iter_field(self, *Ls):
1✔
349
        for L in Ls:
1✔
350
            for iref in L[:]:
1✔
351
                i = iref()
1✔
352
                if i is None:
1!
353
                    L.remove(iref)
×
354
                else:
355
                    yield i
1✔
356

357
    def __iter__(self):
1✔
358
        return self.iter_field(*self._lists)
×
359

360
    def iter_after(self, v):
1✔
361
        it = iter(self)
×
362
        for cell in it:
×
363
            if cell == v:
×
364
                break
×
365
        return it
×
366

367
    def clear(self):
1✔
368
        for c in self:
×
369
            c.float = False
×
370
        for L in self._lists:
×
371
            del L[:]
×
372

373
    def insert(self, v, before):
1✔
374
        L = self._lists[v.float]
×
375
        for i, e in enumerate(self.iter_field(L)):
×
376
            if e == before:
×
377
                L.insert(i, weakref.ref(v))
×
378
                return
×
379
        raise IndexError('output before dead cell')
×
380

381
    def append(self, v):
1✔
382
        L = self._lists[v.float]
1✔
383
        L.append(weakref.ref(v))
1✔
384

385

386
cells = WeakCellList()
1✔
387

388

389
def get_position():
1✔
390
    global cached_pos
391
    if not cached_pos:
1✔
392
        cached_pos = [0, 0]
1✔
393
    return tuple(cached_pos)
1✔
394

395

396
def output(s='', float=False, priority=10, frozen=False, indent=0, before=None):
1✔
397
    with wlock:
1✔
398
        if before:
1!
399
            float = before.float
×
400

401
        if isinstance(s, bytes):
1✔
402
            s = s.decode('utf-8', 'backslashreplace')
1✔
403
        if frozen:
1✔
404
            for f in cells.floats:
1✔
405
                f.prepare_redraw()
1✔
406
                do('ed')  # we could do it only when necessary
1✔
407
                break
1✔
408
            ret = put(s)
1✔
409
            for f in cells.floats:
1✔
410
                f.draw()
1✔
411
            for f in cells.floats:
1✔
412
                fd.flush()
1✔
413
                break
1✔
414
            return ret
1✔
415

416
        c = Cell(s, float)
1✔
417
        if before is None:
1!
418
            cells.append(c)
1✔
419
            c.draw()
1✔
420
        else:
421
            before.prepare_redraw()
×
422
            cells.insert(c, before)
×
423
            c.draw()
×
424
            for f in cells.iter_after(c):
×
425
                f.draw()
×
426
        return c
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc