• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 12013336921

25 Nov 2024 03:28PM UTC coverage: 99.025%. Remained the same
12013336921

Pull #273

github

web-flow
Merge 5405e2ea4 into a5b5e34b8
Pull Request #273: Target `make safetest` got broken

914 of 923 relevant lines covered (99.02%)

6.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.08
/mocket/ssl/socket.py
1
from __future__ import annotations
7✔
2

3
import ssl
7✔
4
from datetime import datetime, timedelta
7✔
5
from typing import Any
7✔
6

7
from mocket.compat import encode_to_bytes
7✔
8
from mocket.mocket import Mocket
7✔
9
from mocket.socket import MocketSocket
7✔
10
from mocket.types import _PeerCertRetDictType
7✔
11

12

13
class MocketSSLSocket(MocketSocket):
7✔
14
    def __init__(self, *args: Any, **kwargs: Any) -> None:
7✔
15
        super().__init__(*args, **kwargs)
7✔
16

17
        self._did_handshake = False
7✔
18
        self._sent_non_empty_bytes = False
7✔
19
        self._original_socket: MocketSocket = self
7✔
20

21
    def read(self, buffersize: int | None = None) -> bytes:
7✔
22
        rv = self.io.read(buffersize)
7✔
23
        if rv:
7✔
24
            self._sent_non_empty_bytes = True
7✔
25
        if self._did_handshake and not self._sent_non_empty_bytes:
7✔
26
            raise ssl.SSLWantReadError("The operation did not complete (read)")
3✔
27
        return rv
7✔
28

29
    def write(self, data: bytes) -> int | None:
7✔
30
        return self.send(encode_to_bytes(data))
7✔
31

32
    def do_handshake(self) -> None:
7✔
33
        self._did_handshake = True
7✔
34

35
    def getpeercert(self, binary_form: bool = False) -> _PeerCertRetDictType:
7✔
36
        if not (self._host and self._port):
7✔
37
            self._address = self._host, self._port = Mocket._address
7✔
38

39
        now = datetime.now()
7✔
40
        shift = now + timedelta(days=30 * 12)
7✔
41
        return {
7✔
42
            "notAfter": shift.strftime("%b %d %H:%M:%S GMT"),
43
            "subjectAltName": (
44
                ("DNS", f"*.{self._host}"),
45
                ("DNS", self._host),
46
                ("DNS", "*"),
47
            ),
48
            "subject": (
49
                (("organizationName", f"*.{self._host}"),),
50
                (("organizationalUnitName", "Domain Control Validated"),),
51
                (("commonName", f"*.{self._host}"),),
52
            ),
53
        }
54

55
    def ciper(self) -> tuple[str, str, str]:
7✔
56
        return ("ADH", "AES256", "SHA")
×
57

58
    def compression(self) -> str | None:
7✔
59
        return ssl.OP_NO_COMPRESSION
7✔
60

61
    def unwrap(self) -> MocketSocket:
7✔
62
        return self._original_socket
7✔
63

64
    @classmethod
7✔
65
    def _create(
7✔
66
        cls,
67
        sock: MocketSocket,
68
        ssl_context: ssl.SSLContext | None = None,
69
        server_hostname: str | None = None,
70
        *args: Any,
71
        **kwargs: Any,
72
    ) -> MocketSSLSocket:
73
        ssl_socket = MocketSSLSocket()
7✔
74
        ssl_socket._original_socket = sock
7✔
75
        ssl_socket._true_socket = sock._true_socket
7✔
76

77
        if ssl_context:
7✔
78
            ssl_socket._true_socket = ssl_context.wrap_socket(
7✔
79
                sock=ssl_socket._true_socket,
80
                server_hostname=server_hostname,
81
            )
82

83
        ssl_socket._kwargs = kwargs
7✔
84

85
        ssl_socket._timeout = sock._timeout
7✔
86

87
        ssl_socket._host = sock._host
7✔
88
        ssl_socket._port = sock._port
7✔
89
        ssl_socket._address = sock._address
7✔
90

91
        ssl_socket._io = sock._io
7✔
92
        ssl_socket._entry = sock._entry
7✔
93

94
        return ssl_socket
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc