• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

romis2012 / python-socks / 88

pending completion
88

push

travis-ci-com

romis2012
Read socks5 server bound address

26 of 26 new or added lines in 3 files covered. (100.0%)

1588 of 1596 relevant lines covered (99.5%)

3.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/python_socks/async_/anyio/_stream.py
1
import ssl
4✔
2
from typing import Union
4✔
3

4
import anyio
4✔
5
import anyio.abc
4✔
6
from anyio.streams.tls import TLSStream
4✔
7

8
from ..._errors import ProxyError
4✔
9
from ... import _abc as abc
4✔
10

11
DEFAULT_RECEIVE_SIZE = 65536
4✔
12

13
AnyioStreamType = Union[anyio.abc.SocketStream, TLSStream]
4✔
14

15

16
class AnyioSocketStream(abc.AsyncSocketStream):
4✔
17
    _stream: AnyioStreamType
4✔
18

19
    def __init__(self, stream: AnyioStreamType) -> None:
4✔
20
        self._stream = stream
4✔
21

22
    async def write_all(self, data: bytes):
4✔
23
        await self._stream.send(item=data)
4✔
24

25
    async def read(self, max_bytes: int = DEFAULT_RECEIVE_SIZE):
4✔
26
        try:
4✔
27
            return await self._stream.receive(max_bytes=max_bytes)
4✔
28
        except anyio.EndOfStream:  # pragma: no cover
29
            return b""
30

31
    async def read_exact(self, n: int):
4✔
32
        data = bytearray()
4✔
33
        while len(data) < n:
4✔
34
            packet = await self.read(n - len(data))
4✔
35
            if not packet:  # pragma: no cover
36
                raise ProxyError('Connection closed unexpectedly')
37
            data += packet
4✔
38
        return data
4✔
39

40
    async def start_tls(
4✔
41
        self,
42
        hostname: str,
43
        ssl_context: ssl.SSLContext,
44
    ) -> 'AnyioSocketStream':
45
        ssl_stream = await TLSStream.wrap(
4✔
46
            self._stream,
47
            ssl_context=ssl_context,
48
            hostname=hostname,
49
            standard_compatible=False,
50
            server_side=False,
51
        )
52
        return AnyioSocketStream(ssl_stream)
4✔
53

54
    async def close(self):
4✔
55
        await self._stream.aclose()
4✔
56

57
    @property
4✔
58
    def anyio_stream(self) -> AnyioStreamType:  # pragma: no cover
59
        return self._stream
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc