• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 7250412654

18 Dec 2023 03:44PM UTC coverage: 74.547% (+0.1%) from 74.452%
7250412654

push

github

web-flow
Merge branch 'dev' into retguard

4565 of 7244 branches covered (0.0%)

350 of 507 new or added lines in 17 files covered. (69.03%)

13 existing lines in 5 files now uncovered.

12843 of 17228 relevant lines covered (74.55%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.56
/pwnlib/term/readline.py
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import
1✔
3
from __future__ import division
1✔
4
from __future__ import print_function
1✔
5

6
import io
1✔
7
import six
1✔
8
import sys
1✔
9

10
from pwnlib.term import keyconsts as kc
1✔
11
from pwnlib.term import keymap as km
1✔
12
from pwnlib.term import term
1✔
13
from pwnlib.term import text
1✔
14

15
cursor = text.reverse
1✔
16

17
buffer_left, buffer_right = '', ''
1✔
18
saved_buffer = None
1✔
19
history = []
1✔
20
history_idx = None
1✔
21
prompt_handle = None
1✔
22
buffer_handle = None
1✔
23
suggest_handle = None
1✔
24
search_idx = None
1✔
25
search_results = []
1✔
26
startup_hook = None
1✔
27
shutdown_hook = None
1✔
28

29
delims = ' /;:.\\'
1✔
30

31
show_completion = True
1✔
32
show_suggestions = False
1✔
33

34
complete_hook = None
1✔
35
suggest_hook = None
1✔
36

37
tabs = 0
1✔
38

39
def force_to_bytes(data):
1✔
40
    if isinstance(data, bytes):
×
41
        return data
×
42
    try:
×
43
        return data.encode('utf-8')
×
44
    except Exception:
×
45
        return data.encode('latin-1')
×
46

47
def set_completer(completer):
1✔
48
    global complete_hook, suggest_hook
49
    if completer is None:
×
50
        complete_hook = None
×
51
        suggest_hook = None
×
52
    else:
53
        complete_hook = completer.complete
×
54
        suggest_hook = completer.suggest
×
55

56
def fmt_suggestions(suggestions):
1✔
57
    if suggestions:
×
58
        s = ''
×
59
        l = max(map(len, suggestions))
×
60
        columns = term.width // (l + 1)
×
61
        column_width = term.width // columns
×
62
        fmt = '%%-%ds' % column_width
×
63
        for j in range(0, len(suggestions), columns):
×
64
            for k in range(columns):
×
65
                l = j + k
×
66
                if l < len(suggestions):
×
67
                    s += fmt % suggestions[l]
×
68
            s += '\n'
×
69
    else:
70
        s = '(no completions)\n'
×
71
    return s
×
72

73
def auto_complete(*_):
1✔
74
    global show_suggestions, tabs
75
    if search_idx is not None:
×
76
        commit_search()
×
77
        tabs = 0
×
78
    elif tabs == 1:
×
79
        if complete_hook:
×
80
            ret = complete_hook(buffer_left, buffer_right)
×
81
            if ret:
×
82
                tabs = 0
×
83
                insert_text(ret)
×
84
    else:
85
        show_suggestions = not show_suggestions
×
86
        redisplay()
×
87

88
def handle_keypress(trace):
1✔
89
    global tabs
90
    k = trace[-1]
×
91
    if k == '<tab>':
×
92
        tabs += 1
×
93
    else:
94
        tabs = 0
×
95

96
def clear():
1✔
97
    global buffer_left, buffer_right, history_idx, search_idx
98
    buffer_left, buffer_right = '', ''
×
99
    history_idx = None
×
100
    search_idx = None
×
101
    redisplay()
×
102

103
def redisplay():
1✔
104
    global suggest_handle
105
    if buffer_handle:
×
106
        if show_suggestions and suggest_hook:
×
107
            suggestions = suggest_hook(buffer_left, buffer_right)
×
108
            if suggest_handle is None:
×
109
                h = prompt_handle or buffer_handle
×
110
                suggest_handle = term.output(before = h)
×
111
            s = fmt_suggestions(suggestions)
×
112
            suggest_handle.update(s)
×
113
        elif suggest_handle:
×
114
            suggest_handle.update('')
×
115
        if search_idx is None:
×
116
            s = None
×
117
            if buffer_right:
×
118
                s = buffer_left + cursor(buffer_right[0]) + buffer_right[1:]
×
119
            elif show_completion and complete_hook:
×
120
                ret = complete_hook(buffer_left, buffer_right)
×
121
                if ret:
×
122
                    s = buffer_left + \
×
123
                      text.underline(cursor(ret[0])) + \
124
                      text.underline(ret[1:])
125
            s = s or buffer_left + cursor(' ')
×
126
            buffer_handle.update(s)
×
127
        else:
128
            if search_results != []:
×
129
                idx, i, j = search_results[search_idx]
×
130
                buf = history[idx]
×
131
                a, b, c = buf[:i], buf[i:j], buf[j:]
×
132
                s = a + text.bold_green(b) + c
×
133
            else:
134
                s = text.white_on_red(buffer_left)
×
135
            buffer_handle.update('(search) ' + s)
×
136

137
def self_insert(trace):
1✔
138
    if len(trace) != 1:
×
139
        return
×
140
    k = trace[0]
×
141
    if k.type == kc.TYPE_UNICODE and k.mods == kc.MOD_NONE:
×
142
        insert_text(k.code)
×
143

144
def set_buffer(left, right):
1✔
145
    global buffer_left, buffer_right
146
    buffer_left = left
×
147
    buffer_right = right
×
148
    redisplay()
×
149

150
def cancel_search(*_):
1✔
151
    global search_idx
152
    if search_idx is not None:
×
153
        search_idx = None
×
154
        redisplay()
×
155

156
def commit_search():
1✔
157
    global search_idx
158
    if search_idx is not None and search_results:
×
159
        set_buffer(history[search_results[search_idx][0]], '')
×
160
        search_idx = None
×
161
        redisplay()
×
162

163
def update_search_results():
1✔
164
    global search_results, search_idx, show_suggestions
165
    if search_idx is None:
×
166
        return
×
167
    show_suggestions = False
×
168
    if search_results:
×
169
        hidx = search_results[search_idx][0]
×
170
    else:
171
        hidx = None
×
172
    search_results = []
×
173
    search_idx = 0
×
174
    if not buffer_left:
×
175
        return
×
176
    for idx, h in enumerate(history):
×
177
        for i in range(0, len(h) - len(buffer_left) + 1):
×
178
            if h[i:i + len(buffer_left)] == buffer_left:
×
179
                if hidx is not None and idx == hidx:
×
180
                    search_idx = len(search_results)
×
181
                search_results.append((idx, i, i + len(buffer_left)))
×
182
                break
×
183

184
def search_history(*_):
1✔
185
    global buffer_left, buffer_right, history_idx, search_idx
186
    if search_idx is None:
×
187
        buffer_left, buffer_right = buffer_left + buffer_right, ''
×
188
        history_idx = None
×
189
        search_idx = 0
×
190
        update_search_results()
×
191
    elif search_results:
×
192
        search_idx = (search_idx + 1) % len(search_results)
×
193
    redisplay()
×
194

195
def history_prev(*_):
1✔
196
    global history_idx, saved_buffer
197
    if history == []:
×
198
        return
×
199
    cancel_search()
×
200
    if history_idx is None:
×
201
        saved_buffer = (buffer_left, buffer_right)
×
202
        history_idx = -1
×
203
    if history_idx < len(history) - 1:
×
204
        history_idx += 1
×
205
        set_buffer(history[history_idx], '')
×
206

207
def history_next(*_):
1✔
208
    global history_idx, saved_buffer
209
    if history_idx is None:
×
210
        return
×
211
    cancel_search()
×
212
    if history_idx == 0:
×
213
        set_buffer(*saved_buffer)
×
214
        history_idx = None
×
215
        saved_buffer = None
×
216
    else:
217
        history_idx -= 1
×
218
        set_buffer(history[history_idx], '')
×
219

220
def backward_char(*_):
1✔
221
    global buffer_left, buffer_right
222
    commit_search()
×
223
    if buffer_left:
×
224
        buffer_right = buffer_left[-1] + buffer_right
×
225
        buffer_left = buffer_left[:-1]
×
226
    redisplay()
×
227

228
def forward_char(*_):
1✔
229
    global buffer_left, buffer_right
230
    commit_search()
×
231
    if buffer_right:
×
232
        buffer_left += buffer_right[0]
×
233
        buffer_right = buffer_right[1:]
×
234
    redisplay()
×
235

236
def insert_text(s):
1✔
237
    global history_idx, saved_buffer, buffer_left
238
    if history_idx is not None:
×
239
        history_idx = None
×
240
        saved_buffer = None
×
241
    buffer_left += s
×
242
    update_search_results()
×
243
    redisplay()
×
244

245
def submit(*_):
1✔
246
    if search_idx is not None:
×
247
        commit_search()
×
248
    else:
249
        keymap.stop()
×
250

251
def control_c(*_):
1✔
252
    global history_idx, saved_buffer
253
    if search_idx is not None:
×
254
        cancel_search()
×
255
    elif history_idx is not None:
×
256
        set_buffer(*saved_buffer)
×
257
        history_idx = None
×
258
        saved_buffer = None
×
259
    elif buffer_left or buffer_right:
×
260
        clear()
×
261
    else:
262
        raise KeyboardInterrupt
×
263

264
def control_d(*_):
1✔
265
    if buffer_left or buffer_right:
×
266
        return
×
267
    global eof
268
    eof = True
×
269
    keymap.stop()
×
270

271
def kill_to_end(*_):
1✔
272
    global buffer_right
273
    commit_search()
×
274
    buffer_right = []
×
275
    redisplay()
×
276

277
def delete_char_forward(*_):
1✔
278
    global buffer_right
279
    commit_search()
×
280
    if buffer_right:
×
281
        buffer_right = buffer_right[1:]
×
282
        redisplay()
×
283

284
def delete_char_backward(*_):
1✔
285
    global buffer_left
286
    if buffer_left:
×
287
        buffer_left = buffer_left[:-1]
×
288
        update_search_results()
×
289
        redisplay()
×
290

291
def kill_word_backward(*_):
1✔
292
    global buffer_left
293
    commit_search()
×
294
    flag = False
×
295
    while buffer_left:
×
296
        c = buffer_left[-1]
×
297
        if c[0] in delims:
×
298
            if flag:
×
299
                break
×
300
        else:
301
            flag = True
×
302
        buffer_left = buffer_left[:-1]
×
303
    redisplay()
×
304

305
def backward_word(*_):
1✔
306
    global buffer_left, buffer_right
307
    commit_search()
×
308
    flag = False
×
309
    while buffer_left:
×
310
        c = buffer_left[-1]
×
311
        if c[0] in delims:
×
312
            if flag:
×
313
                break
×
314
        else:
315
            flag = True
×
316
        buffer_right = buffer_left[-1] + buffer_right
×
317
        buffer_left = buffer_left[:-1]
×
318
    redisplay()
×
319

320
def forward_word(*_):
1✔
321
    global buffer_left, buffer_right
322
    commit_search()
×
323
    flag = False
×
324
    while buffer_right:
×
325
        c = buffer_right[0]
×
326
        if c[0] in delims:
×
327
            if flag:
×
328
                break
×
329
        else:
330
            flag = True
×
331
        buffer_left += buffer_right[0]
×
332
        buffer_right = buffer_right[1:]
×
333
    redisplay()
×
334

335
def go_beginning(*_):
1✔
336
    commit_search()
×
337
    set_buffer('', buffer_left + buffer_right)
×
338

339
def go_end(*_):
1✔
340
    commit_search()
×
341
    set_buffer(buffer_left + buffer_right, '')
×
342

343
keymap = km.Keymap({
1✔
344
    '<nomatch>'   : self_insert,
345
    '<up>'        : history_prev,
346
    '<down>'      : history_next,
347
    '<left>'      : backward_char,
348
    '<right>'     : forward_char,
349
    '<del>'       : delete_char_backward,
350
    '<delete>'    : delete_char_forward,
351
    '<enter>'     : submit,
352
    'C-j'         : submit,
353
    'C-<left>'    : backward_word,
354
    'C-<right>'   : forward_word,
355
    'M-<left>'    : backward_word,
356
    'M-<right>'   : forward_word,
357
    'C-c'         : control_c,
358
    'C-d'         : control_d,
359
    'C-k'         : kill_to_end,
360
    'C-w'         : kill_word_backward,
361
    '<backspace>' : kill_word_backward,
362
    'M-<del>'     : kill_word_backward,
363
    'C-r'         : search_history,
364
    '<escape>'    : cancel_search,
365
    'C-a'         : go_beginning,
366
    'C-e'         : go_end,
367
    '<tab>'       : auto_complete,
368
    '<any>'       : handle_keypress,
369
    })
370

371
def readline(_size=-1, prompt='', float=True, priority=10):
1✔
372
    # The argument  _size is unused, but is there for compatibility
373
    # with the existing readline
374

375
    global buffer_handle, prompt_handle, suggest_handle, eof, \
376
        show_suggestions
377

378
    # XXX circular imports
379
    from pwnlib.term import term_mode
1✔
380
    if not term_mode:
1!
381
        six.print_(prompt, end='', flush=True)
1✔
382
        return getattr(sys.stdin, 'buffer', sys.stdin).readline(_size).rstrip(b'\n')
1✔
383
    show_suggestions = False
×
384
    eof = False
×
385
    if prompt:
×
386
        prompt_handle = term.output(prompt, float = float, priority = priority)
×
387
    else:
388
        prompt_handle = None
×
389
    buffer_handle = term.output(float = float, priority = priority)
×
390
    suggest_handle = None
×
391
    clear()
×
392
    if startup_hook:
×
393
        startup_hook()
×
394
    try:
×
395
        while True:
×
396
            try:
×
397
                try:
×
398
                    keymap.handle_input()
×
399
                except EOFError:
×
400
                    if len(buffer_left + buffer_right) == 0:
×
401
                        return b''
×
402
                if eof:
×
403
                    return b''
×
404
                else:
405
                    buffer = (buffer_left + buffer_right)
×
406
                    if buffer:
×
407
                        history.insert(0, buffer)
×
408
                    return force_to_bytes(buffer)
×
409
            except KeyboardInterrupt:
×
NEW
410
                do_raise = False
×
NEW
411
                try:
×
NEW
412
                    control_c()
×
NEW
413
                except KeyboardInterrupt:
×
NEW
414
                    do_raise = True
×
NEW
415
                if do_raise:
×
NEW
416
                    raise
×
417
    finally:
418
        line = buffer_left + buffer_right + '\n'
×
419
        buffer_handle.update(line)
×
420
        buffer_handle = None
×
421
        if prompt_handle:
×
422
            prompt_handle = None
×
423
        if suggest_handle:
×
424
            suggest_handle = None
×
425
        if shutdown_hook:
×
426
            shutdown_hook()
×
427

428
def raw_input(prompt='', float=True):
1✔
429
    r"""raw_input(prompt='', float=True)
430

431
    Replacement for the built-in ``raw_input`` using ``pwnlib`` readline
432
    implementation.
433

434
    Arguments:
435
        prompt(str): The prompt to show to the user.
436
        float(bool): If set to `True`, prompt and input will float to the
437
                     bottom of the screen when `term.term_mode` is enabled.
438
    """
439
    return readline(-1, prompt, float)
1✔
440

441
def str_input(prompt='', float=True):
1✔
442
    r"""str_input(prompt='', float=True)
443

444
    Replacement for the built-in ``input`` in python3 using ``pwnlib`` readline
445
    implementation.
446

447
    Arguments:
448
        prompt(str): The prompt to show to the user.
449
        float(bool): If set to `True`, prompt and input will float to the
450
                     bottom of the screen when `term.term_mode` is enabled.
451
    """
452
    return readline(-1, prompt, float).decode()
×
453

454
def eval_input(prompt='', float=True):
1✔
455
    """eval_input(prompt='', float=True)
456

457
    Replacement for the built-in python 2 - style ``input`` using
458
    ``pwnlib`` readline implementation, and `pwnlib.util.safeeval.expr`
459
    instead of ``eval`` (!).
460

461
    Arguments:
462
        prompt(str): The prompt to show to the user.
463
        float(bool): If set to ``True``, prompt and input will float to the
464
                     bottom of the screen when `term.term_mode` is enabled.
465

466
    Example:
467

468
        >>> try:
469
        ...     saved = sys.stdin, pwnlib.term.term_mode
470
        ...     pwnlib.term.term_mode = False
471
        ...     sys.stdin = io.TextIOWrapper(io.BytesIO(b"{'a': 20}"))
472
        ...     eval_input("Favorite object? ")['a']
473
        ... finally:
474
        ...     sys.stdin, pwnlib.term.term_mode = saved
475
        Favorite object? 20
476
    """
477
    from pwnlib.util import safeeval
1✔
478
    return safeeval.const(readline(-1, prompt, float))
1✔
479

480
def init():
1✔
481
    global safeeval
482
    # defer imports until initialization
483
    import sys
1✔
484
    from six.moves import builtins
1✔
485
    from pwnlib.util import safeeval
1✔
486

487
    class Wrapper:
1✔
488
        def __init__(self, fd):
1✔
489
            self._fd = fd
1✔
490
        def readline(self, size = None):
1✔
NEW
491
            r = readline(size)
×
NEW
492
            if isinstance(self._fd, io.TextIOWrapper):
×
NEW
493
                r = r.decode(encoding=self._fd.encoding, errors=self._fd.errors)
×
NEW
494
            return r
×
495
        def __getattr__(self, k):
1✔
496
            return getattr(self._fd, k)
×
497
    sys.stdin = Wrapper(sys.stdin)
1✔
498

499
    if six.PY2:
1✔
500
        builtins.raw_input = raw_input
1✔
501
        builtins.input = eval_input
1✔
502
    else:
503
        builtins.input = str_input
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc