• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 12227192999

26 Nov 2024 11:20AM UTC coverage: 98.68% (-0.002%) from 98.682%
12227192999

push

github

web-flow
Small cleanup (#275)

* Small cleanup.

2 of 3 new or added lines in 1 file covered. (66.67%)

7 existing lines in 2 files now uncovered.

972 of 985 relevant lines covered (98.68%)

6.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.39
/mocket/socket.py
1
from __future__ import annotations
7✔
2

3
import contextlib
7✔
4
import errno
7✔
5
import os
7✔
6
import select
7✔
7
import socket
7✔
8
from types import TracebackType
7✔
9
from typing import Any, Type
7✔
10

11
from typing_extensions import Self
7✔
12

13
from mocket.entry import MocketEntry
7✔
14
from mocket.io import MocketSocketIO
7✔
15
from mocket.mocket import Mocket
7✔
16
from mocket.mode import MocketMode
7✔
17
from mocket.types import (
7✔
18
    Address,
19
    ReadableBuffer,
20
    WriteableBuffer,
21
    _RetAddress,
22
)
23

24
true_gethostbyname = socket.gethostbyname
7✔
25
true_socket = socket.socket
7✔
26

27

28
def mock_create_connection(address, timeout=None, source_address=None):
7✔
29
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
7✔
30
    if timeout:
7✔
31
        s.settimeout(timeout)
7✔
32
    s.connect(address)
7✔
33
    return s
7✔
34

35

36
def mock_getaddrinfo(
7✔
37
    host: str,
38
    port: int,
39
    family: int = 0,
40
    type: int = 0,
41
    proto: int = 0,
42
    flags: int = 0,
43
) -> list[tuple[int, int, int, str, tuple[str, int]]]:
44
    return [(2, 1, 6, "", (host, port))]
7✔
45

46

47
def mock_gethostbyname(hostname: str) -> str:
7✔
48
    return "127.0.0.1"
7✔
49

50

51
def mock_gethostname() -> str:
7✔
52
    return "localhost"
7✔
53

54

55
def mock_inet_pton(address_family: int, ip_string: str) -> bytes:
7✔
56
    return bytes("\x7f\x00\x00\x01", "utf-8")
7✔
57

58

59
def mock_socketpair(*args, **kwargs):
7✔
60
    """Returns a real socketpair() used by asyncio loop for supporting calls made by fastapi and similar services."""
61
    import _socket
7✔
62

63
    return _socket.socketpair(*args, **kwargs)
7✔
64

65

66
class MocketSocket:
7✔
67
    def __init__(
7✔
68
        self,
69
        family: socket.AddressFamily | int = socket.AF_INET,
70
        type: socket.SocketKind | int = socket.SOCK_STREAM,
71
        proto: int = 0,
72
        fileno: int | None = None,
73
        **kwargs: Any,
74
    ) -> None:
75
        self._family = family
7✔
76
        self._type = type
7✔
77
        self._proto = proto
7✔
78

79
        self._kwargs = kwargs
7✔
80
        self._true_socket = true_socket(family, type, proto)
7✔
81

82
        self._buflen = 65536
7✔
83
        self._timeout: float | None = None
7✔
84

85
        self._host = None
7✔
86
        self._port = None
7✔
87
        self._address = None
7✔
88

89
        self._io = None
7✔
90
        self._entry = None
7✔
91

92
    def __str__(self) -> str:
93
        return f"({self.__class__.__name__})(family={self.family} type={self.type} protocol={self.proto})"
94

95
    def __enter__(self) -> Self:
7✔
96
        return self
7✔
97

98
    def __exit__(
7✔
99
        self,
100
        type_: Type[BaseException] | None,  # noqa: UP006
101
        value: BaseException | None,
102
        traceback: TracebackType | None,
103
    ) -> None:
104
        self.close()
7✔
105

106
    @property
7✔
107
    def family(self) -> int:
7✔
108
        return self._family
7✔
109

110
    @property
7✔
111
    def type(self) -> int:
7✔
112
        return self._type
7✔
113

114
    @property
7✔
115
    def proto(self) -> int:
7✔
116
        return self._proto
7✔
117

118
    @property
7✔
119
    def io(self) -> MocketSocketIO:
7✔
120
        if self._io is None:
7✔
121
            self._io = MocketSocketIO((self._host, self._port))
7✔
122
        return self._io
7✔
123

124
    def fileno(self) -> int:
7✔
125
        address = (self._host, self._port)
7✔
126
        r_fd, _ = Mocket.get_pair(address)
7✔
127
        if not r_fd:
7✔
128
            r_fd, w_fd = os.pipe()
7✔
129
            Mocket.set_pair(address, (r_fd, w_fd))
7✔
130
        return r_fd
7✔
131

132
    def gettimeout(self) -> float | None:
7✔
133
        return self._timeout
7✔
134

135
    # FIXME the arguments here seem wrong. they should be `level: int, optname: int, value: int | ReadableBuffer | None`
136
    def setsockopt(self, family: int, type: int, proto: int) -> None:
7✔
137
        self._family = family
7✔
138
        self._type = type
7✔
139
        self._proto = proto
7✔
140

141
        if self._true_socket:
7✔
142
            self._true_socket.setsockopt(family, type, proto)
7✔
143

144
    def settimeout(self, timeout: float | None) -> None:
7✔
145
        self._timeout = timeout
7✔
146

147
    @staticmethod
7✔
148
    def getsockopt(level: int, optname: int, buflen: int | None = None) -> int:
7✔
149
        return socket.SOCK_STREAM
×
150

151
    def getpeername(self) -> _RetAddress:
7✔
152
        return self._address
7✔
153

154
    def setblocking(self, block: bool) -> None:
7✔
155
        self.settimeout(None) if block else self.settimeout(0.0)
7✔
156

157
    def getblocking(self) -> bool:
7✔
158
        return self.gettimeout() is None
7✔
159

160
    def getsockname(self) -> _RetAddress:
7✔
161
        return true_gethostbyname(self._address[0]), self._address[1]
7✔
162

163
    def connect(self, address: Address) -> None:
7✔
164
        self._address = self._host, self._port = address
7✔
165
        Mocket._address = address
7✔
166

167
    def makefile(self, mode: str = "r", bufsize: int = -1) -> MocketSocketIO:
7✔
168
        return self.io
7✔
169

170
    def get_entry(self, data: bytes) -> MocketEntry | None:
7✔
171
        return Mocket.get_entry(self._host, self._port, data)
7✔
172

173
    def sendall(self, data, entry=None, *args, **kwargs):
7✔
174
        if entry is None:
7✔
175
            entry = self.get_entry(data)
7✔
176

177
        if entry:
7✔
178
            consume_response = entry.collect(data)
7✔
179
            response = entry.get_response() if consume_response is not False else None
7✔
180
        else:
181
            response = self.true_sendall(data, *args, **kwargs)
7✔
182

183
        if response is not None:
7✔
184
            self.io.seek(0)
7✔
185
            self.io.write(response)
7✔
186
            self.io.truncate()
7✔
187
            self.io.seek(0)
7✔
188

189
    def recv_into(
7✔
190
        self,
191
        buffer: WriteableBuffer,
192
        buffersize: int | None = None,
193
        flags: int | None = None,
194
    ) -> int:
195
        if hasattr(buffer, "write"):
7✔
196
            return buffer.write(self.recv(buffersize))
7✔
197

198
        # buffer is a memoryview
UNCOV
199
        if buffersize is None:
3✔
UNCOV
200
            buffersize = len(buffer)
3✔
201

UNCOV
202
        data = self.recv(buffersize)
3✔
UNCOV
203
        if data:
3✔
UNCOV
204
            buffer[: len(data)] = data
3✔
UNCOV
205
        return len(data)
3✔
206

207
    def recv(self, buffersize: int, flags: int | None = None) -> bytes:
7✔
208
        r_fd, _ = Mocket.get_pair((self._host, self._port))
7✔
209
        if r_fd:
7✔
210
            return os.read(r_fd, buffersize)
7✔
211
        data = self.io.read(buffersize)
7✔
212
        if data:
7✔
213
            return data
7✔
214
        # used by Redis mock
215
        exc = BlockingIOError()
7✔
216
        exc.errno = errno.EWOULDBLOCK
7✔
217
        exc.args = (0,)
7✔
218
        raise exc
7✔
219

220
    def true_sendall(self, data: bytes, *args: Any, **kwargs: Any) -> bytes:
7✔
221
        if not MocketMode().is_allowed(self._address):
7✔
222
            MocketMode.raise_not_allowed()
7✔
223

224
        # try to get the response from recordings
225
        if Mocket._record_storage:
7✔
226
            record = Mocket._record_storage.get_record(
7✔
227
                address=self._address,
228
                request=data,
229
            )
230
            if record is not None:
7✔
231
                return record.response
7✔
232

233
        host, port = self._address
7✔
234
        host = true_gethostbyname(host)
7✔
235

236
        with contextlib.suppress(OSError, ValueError):
7✔
237
            # already connected
238
            self._true_socket.connect((host, port))
7✔
239

240
        self._true_socket.sendall(data, *args, **kwargs)
7✔
241
        response = b""
7✔
242
        # https://github.com/kennethreitz/requests/blob/master/tests/testserver/server.py#L12
243
        while True:
5✔
244
            more_to_read = select.select([self._true_socket], [], [], 0.1)[0]
7✔
245
            if not more_to_read and response:
7✔
246
                break
7✔
247
            new_content = self._true_socket.recv(self._buflen)
7✔
248
            if not new_content:
7✔
249
                break
7✔
250
            response += new_content
7✔
251

252
        # store request+response in recordings
253
        if Mocket._record_storage:
7✔
254
            Mocket._record_storage.put_record(
7✔
255
                address=self._address,
256
                request=data,
257
                response=response,
258
            )
259

260
        return response
7✔
261

262
    def send(
263
        self,
264
        data: ReadableBuffer,
265
        *args: Any,
266
        **kwargs: Any,
267
    ) -> int:  # pragma: no cover
268
        entry = self.get_entry(data)
269
        if not entry or (entry and self._entry != entry):
270
            kwargs["entry"] = entry
271
            self.sendall(data, *args, **kwargs)
272
        else:
273
            req = Mocket.last_request()
274
            if hasattr(req, "add_data"):
275
                req.add_data(data)
276
        self._entry = entry
277
        return len(data)
278

279
    def close(self) -> None:
7✔
280
        if self._true_socket and not self._true_socket._closed:
7✔
281
            self._true_socket.close()
7✔
282

283
    def __getattr__(self, name: str) -> Any:
7✔
284
        """Do nothing catchall function, for methods like shutdown()"""
285

286
        def do_nothing(*args: Any, **kwargs: Any) -> Any:
7✔
287
            pass
7✔
288

289
        return do_nothing
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc