• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 12029445091

26 Nov 2024 11:16AM UTC coverage: 98.68% (-0.002%) from 98.682%
12029445091

Pull #275

github

web-flow
Merge 008ef9603 into e529319e1
Pull Request #275: Small cleanup

2 of 3 new or added lines in 1 file covered. (66.67%)

7 existing lines in 2 files now uncovered.

972 of 985 relevant lines covered (98.68%)

6.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.11
/mocket/ssl/socket.py
1
from __future__ import annotations
7✔
2

3
import ssl
7✔
4
from datetime import datetime, timedelta
7✔
5
from ssl import Options
7✔
6
from typing import Any
7✔
7

8
from mocket.compat import encode_to_bytes
7✔
9
from mocket.mocket import Mocket
7✔
10
from mocket.socket import MocketSocket
7✔
11
from mocket.types import _PeerCertRetDictType
7✔
12

13

14
class MocketSSLSocket(MocketSocket):
7✔
15
    def __init__(self, *args: Any, **kwargs: Any) -> None:
7✔
16
        super().__init__(*args, **kwargs)
7✔
17

18
        self._did_handshake = False
7✔
19
        self._sent_non_empty_bytes = False
7✔
20
        self._original_socket: MocketSocket = self
7✔
21

22
    def read(self, buffersize: int | None = None) -> bytes:
7✔
23
        rv = self.io.read(buffersize)
7✔
24
        if rv:
7✔
25
            self._sent_non_empty_bytes = True
7✔
26
        if self._did_handshake and not self._sent_non_empty_bytes:
7✔
UNCOV
27
            raise ssl.SSLWantReadError("The operation did not complete (read)")
3✔
28
        return rv
7✔
29

30
    def write(self, data: bytes) -> int | None:
7✔
31
        return self.send(encode_to_bytes(data))
7✔
32

33
    def do_handshake(self) -> None:
7✔
34
        self._did_handshake = True
7✔
35

36
    def getpeercert(self, binary_form: bool = False) -> _PeerCertRetDictType:
7✔
37
        if not (self._host and self._port):
7✔
38
            self._address = self._host, self._port = Mocket._address
7✔
39

40
        now = datetime.now()
7✔
41
        shift = now + timedelta(days=30 * 12)
7✔
42
        return {
7✔
43
            "notAfter": shift.strftime("%b %d %H:%M:%S GMT"),
44
            "subjectAltName": (
45
                ("DNS", f"*.{self._host}"),
46
                ("DNS", self._host),
47
                ("DNS", "*"),
48
            ),
49
            "subject": (
50
                (("organizationName", f"*.{self._host}"),),
51
                (("organizationalUnitName", "Domain Control Validated"),),
52
                (("commonName", f"*.{self._host}"),),
53
            ),
54
        }
55

56
    def ciper(self) -> tuple[str, str, str]:
7✔
NEW
57
        return "ADH", "AES256", "SHA"
×
58

59
    def compression(self) -> Options:
7✔
60
        return ssl.OP_NO_COMPRESSION
7✔
61

62
    def unwrap(self) -> MocketSocket:
7✔
63
        return self._original_socket
7✔
64

65
    @classmethod
7✔
66
    def _create(
7✔
67
        cls,
68
        sock: MocketSocket,
69
        ssl_context: ssl.SSLContext | None = None,
70
        server_hostname: str | None = None,
71
        *args: Any,
72
        **kwargs: Any,
73
    ) -> MocketSSLSocket:
74
        ssl_socket = MocketSSLSocket()
7✔
75
        ssl_socket._original_socket = sock
7✔
76
        ssl_socket._true_socket = sock._true_socket
7✔
77

78
        if ssl_context:
7✔
79
            ssl_socket._true_socket = ssl_context.wrap_socket(
7✔
80
                sock=ssl_socket._true_socket,
81
                server_hostname=server_hostname,
82
            )
83

84
        ssl_socket._kwargs = kwargs
7✔
85

86
        ssl_socket._timeout = sock._timeout
7✔
87

88
        ssl_socket._host = sock._host
7✔
89
        ssl_socket._port = sock._port
7✔
90
        ssl_socket._address = sock._address
7✔
91

92
        ssl_socket._io = sock._io
7✔
93
        ssl_socket._entry = sock._entry
7✔
94

95
        return ssl_socket
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc