• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

romis2012 / python-socks / 88

pending completion
88

push

travis-ci-com

romis2012
Read socks5 server bound address

26 of 26 new or added lines in 3 files covered. (100.0%)

1588 of 1596 relevant lines covered (99.5%)

3.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.48
/python_socks/async_/asyncio/_connect.py
1
import socket
4✔
2
import asyncio
4✔
3
from typing import Optional, Tuple
4✔
4

5
from ._resolver import Resolver
4✔
6
from ..._helpers import is_ipv4_address, is_ipv6_address
4✔
7

8

9
async def connect_tcp(
4✔
10
    host: str,
11
    port: int,
12
    loop: asyncio.AbstractEventLoop,
13
    local_addr: Optional[Tuple[str, int]] = None,
14
) -> socket.socket:
15

16
    family, host = await _resolve_host(host, loop)
4✔
17

18
    sock = socket.socket(family=family, type=socket.SOCK_STREAM)
4✔
19
    sock.setblocking(False)
4✔
20
    if local_addr is not None:  # pragma: no cover
21
        sock.bind(local_addr)
22

23
    if is_ipv6_address(host):
4✔
24
        address = (host, port, 0, 0)  # to fix OSError: [WinError 10022]
×
25
    else:
26
        address = (host, port)
4✔
27

28
    await loop.sock_connect(sock=sock, address=address)
4✔
29
    return sock
4✔
30

31

32
async def _resolve_host(host, loop):
4✔
33
    if is_ipv4_address(host):
4✔
34
        return socket.AF_INET, host
4✔
35
    if is_ipv6_address(host):
4✔
36
        return socket.AF_INET6, host
×
37

38
    resolver = Resolver(loop=loop)
4✔
39
    return await resolver.resolve(host=host)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc