• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 12006176199

25 Nov 2024 08:36AM UTC coverage: 98.424% (-0.8%) from 99.255%
12006176199

Pull #271

github

web-flow
Merge 1f703d518 into 0da27224a
Pull Request #271: Recording: support removing headers from recorded data

13 of 13 new or added lines in 5 files covered. (100.0%)

8 existing lines in 3 files now uncovered.

937 of 952 relevant lines covered (98.42%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/mocket/ssl/socket.py
1
from __future__ import annotations
1✔
2

3
import ssl
1✔
4
from datetime import datetime, timedelta
1✔
5
from typing import Any
1✔
6

7
from mocket.compat import encode_to_bytes
1✔
8
from mocket.mocket import Mocket
1✔
9
from mocket.socket import MocketSocket
1✔
10
from mocket.types import _PeerCertRetDictType
1✔
11

12

13
class MocketSSLSocket(MocketSocket):
1✔
14
    def __init__(self, *args: Any, **kwargs: Any) -> None:
1✔
15
        super().__init__(*args, **kwargs)
1✔
16

17
        self._did_handshake = False
1✔
18
        self._sent_non_empty_bytes = False
1✔
19
        self._original_socket: MocketSocket = self
1✔
20

21
    def read(self, buffersize: int | None = None) -> bytes:
1✔
22
        rv = self.io.read(buffersize)
1✔
23
        if rv:
1✔
24
            self._sent_non_empty_bytes = True
1✔
25
        if self._did_handshake and not self._sent_non_empty_bytes:
1✔
UNCOV
26
            raise ssl.SSLWantReadError("The operation did not complete (read)")
×
27
        return rv
1✔
28

29
    def write(self, data: bytes) -> int | None:
1✔
30
        return self.send(encode_to_bytes(data))
1✔
31

32
    def do_handshake(self) -> None:
1✔
33
        self._did_handshake = True
1✔
34

35
    def getpeercert(self, binary_form: bool = False) -> _PeerCertRetDictType:
1✔
36
        if not (self._host and self._port):
1✔
37
            self._address = self._host, self._port = Mocket._address
1✔
38

39
        now = datetime.now()
1✔
40
        shift = now + timedelta(days=30 * 12)
1✔
41
        return {
1✔
42
            "notAfter": shift.strftime("%b %d %H:%M:%S GMT"),
43
            "subjectAltName": (
44
                ("DNS", f"*.{self._host}"),
45
                ("DNS", self._host),
46
                ("DNS", "*"),
47
            ),
48
            "subject": (
49
                (("organizationName", f"*.{self._host}"),),
50
                (("organizationalUnitName", "Domain Control Validated"),),
51
                (("commonName", f"*.{self._host}"),),
52
            ),
53
        }
54

55
    def ciper(self) -> tuple[str, str, str]:
1✔
56
        return ("ADH", "AES256", "SHA")
×
57

58
    def compression(self) -> str | None:
1✔
59
        return ssl.OP_NO_COMPRESSION
1✔
60

61
    def unwrap(self) -> MocketSocket:
1✔
62
        return self._original_socket
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc