• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / fa8a69b4f1169c6285d6e9f59237bc4bc04d3d17-PR-2272

29 Oct 2023 01:56PM UTC coverage: 73.381% (+1.9%) from 71.511%
fa8a69b4f1169c6285d6e9f59237bc4bc04d3d17-PR-2272

Pull #2272

github-actions

web-flow
Merge 53d07ab96 into 3ff37a8b7
Pull Request #2272: Fix tube.clean_and_log not logging buffered data

3900 of 6416 branches covered (0.0%)

6 of 6 new or added lines in 1 file covered. (100.0%)

12251 of 16695 relevant lines covered (73.38%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pwnlib/gdb_api_bridge.py
1
"""GDB Python API bridge."""
2
import gdb
×
3

4
import socket
×
5
from threading import Condition
×
6
import time
×
7

8
from rpyc.core.protocol import Connection
×
9
from rpyc.core.service import Service
×
10
from rpyc.lib import spawn
×
11
from rpyc.lib.compat import select_error
×
12
from rpyc.utils.server import ThreadedServer
×
13

14

15
class ServeResult:
×
16
    """Result of serving requests on GDB thread."""
17
    def __init__(self):
×
18
        self.cv = Condition()
×
19
        self.done = False
×
20
        self.exc = None
×
21

22
    def set(self, exc):
×
23
        with self.cv:
×
24
            self.done = True
×
25
            self.exc = exc
×
26
            self.cv.notify()
×
27

28
    def wait(self):
×
29
        with self.cv:
×
30
            while not self.done:
×
31
                self.cv.wait()
×
32
            if self.exc is not None:
×
33
                raise self.exc
×
34

35

36
class GdbConnection(Connection):
×
37
    """A Connection implementation that serves requests on GDB thread.
38

39
    Serving on GDB thread might not be ideal from the responsiveness
40
    perspective, however, it is simple and reliable.
41
    """
42
    SERVE_TIME = 0.1  # Number of seconds to serve.
×
43
    IDLE_TIME = 0.1  # Number of seconds to wait after serving.
×
44

45
    def serve_gdb_thread(self, serve_result):
×
46
        """Serve requests on GDB thread."""
47
        try:
×
48
            deadline = time.time() + self.SERVE_TIME
×
49
            while True:
×
50
                timeout = deadline - time.time()
×
51
                if timeout < 0:
×
52
                    break
×
53
                super().serve(timeout=timeout)
×
54
        except Exception as exc:
×
55
            serve_result.set(exc)
×
56
        else:
57
            serve_result.set(None)
×
58

59
    def serve_all(self):
×
60
        """Modified version of rpyc.core.protocol.Connection.serve_all."""
61
        try:
×
62
            while not self.closed:
×
63
                serve_result = ServeResult()
×
64
                gdb.post_event(lambda: self.serve_gdb_thread(serve_result))
×
65
                serve_result.wait()
×
66
                time.sleep(self.IDLE_TIME)
×
67
        except (socket.error, select_error, IOError):
×
68
            if not self.closed:
×
69
                raise
×
70
        except EOFError:
×
71
            pass
×
72
        finally:
73
            self.close()
×
74

75

76
class GdbService(Service):
×
77
    """A public interface for Pwntools."""
78

79
    _protocol = GdbConnection  # Connection subclass.
×
80
    exposed_gdb = gdb  # ``gdb`` module.
×
81

82
    def exposed_set_breakpoint(self, client, has_stop, *args, **kwargs):
×
83
        """Create a breakpoint and connect it with the client-side mirror."""
84
        if has_stop:
×
85
            class Breakpoint(gdb.Breakpoint):
×
86
                def stop(self):
×
87
                    return client.stop()
×
88

89
            return Breakpoint(*args, **kwargs)
×
90
        return gdb.Breakpoint(*args, **kwargs)
×
91

92
    def exposed_set_finish_breakpoint(self, client, has_stop, has_out_of_scope, *args, **kwargs):
×
93
        """Create a finish breakpoint and connect it with the client-side mirror."""
94
        class FinishBreakpoint(gdb.FinishBreakpoint):
×
95
            if has_stop:
×
96
                def stop(self):
×
97
                    return client.stop()
×
98
            if has_out_of_scope:
×
99
                def out_of_scope(self):
×
100
                    client.out_of_scope()
×
101
        return FinishBreakpoint(*args, **kwargs)
×
102

103
    def exposed_quit(self):
×
104
        """Terminate GDB."""
105
        gdb.post_event(lambda: gdb.execute('quit'))
×
106

107

108
spawn(ThreadedServer(
×
109
    service=GdbService(),
110
    socket_path=socket_path,
111
    protocol_config={
112
        'allow_all_attrs': True,
113
    },
114
).start)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc