• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10717947310

05 Sep 2024 09:44AM UTC coverage: 91.681% (-0.01%) from 91.695%
10717947310

Pull #782

github

web-flow
Merge f76b139ac into 0c8ad519e
Pull Request #782: Accept abstract namespace paths for unix domain sockets

9 of 10 new or added lines in 1 file covered. (90.0%)

1 existing line in 1 file now uncovered.

4794 of 5229 relevant lines covered (91.68%)

9.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.0
/src/anyio/_core/_sockets.py
1
from __future__ import annotations
11✔
2

3
import errno
11✔
4
import os
11✔
5
import socket
11✔
6
import ssl
11✔
7
import stat
11✔
8
import sys
11✔
9
from collections.abc import Awaitable
11✔
10
from ipaddress import IPv6Address, ip_address
11✔
11
from os import PathLike, chmod
11✔
12
from socket import AddressFamily, SocketKind
11✔
13
from typing import Any, Literal, cast, overload
11✔
14

15
from .. import to_thread
11✔
16
from ..abc import (
11✔
17
    ConnectedUDPSocket,
18
    ConnectedUNIXDatagramSocket,
19
    IPAddressType,
20
    IPSockAddrType,
21
    SocketListener,
22
    SocketStream,
23
    UDPSocket,
24
    UNIXDatagramSocket,
25
    UNIXSocketStream,
26
)
27
from ..streams.stapled import MultiListener
11✔
28
from ..streams.tls import TLSStream
11✔
29
from ._eventloop import get_async_backend
11✔
30
from ._resources import aclose_forcefully
11✔
31
from ._synchronization import Event
11✔
32
from ._tasks import create_task_group, move_on_after
11✔
33

34
if sys.version_info < (3, 11):
11✔
35
    from exceptiongroup import ExceptionGroup
6✔
36

37
IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41)  # https://bugs.python.org/issue29515
11✔
38

39
AnyIPAddressFamily = Literal[
11✔
40
    AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
41
]
42
IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
11✔
43

44

45
# tls_hostname given
46
@overload
11✔
47
async def connect_tcp(
11✔
48
    remote_host: IPAddressType,
49
    remote_port: int,
50
    *,
51
    local_host: IPAddressType | None = ...,
52
    ssl_context: ssl.SSLContext | None = ...,
53
    tls_standard_compatible: bool = ...,
54
    tls_hostname: str,
55
    happy_eyeballs_delay: float = ...,
56
) -> TLSStream: ...
57

58

59
# ssl_context given
60
@overload
11✔
61
async def connect_tcp(
11✔
62
    remote_host: IPAddressType,
63
    remote_port: int,
64
    *,
65
    local_host: IPAddressType | None = ...,
66
    ssl_context: ssl.SSLContext,
67
    tls_standard_compatible: bool = ...,
68
    tls_hostname: str | None = ...,
69
    happy_eyeballs_delay: float = ...,
70
) -> TLSStream: ...
71

72

73
# tls=True
74
@overload
11✔
75
async def connect_tcp(
11✔
76
    remote_host: IPAddressType,
77
    remote_port: int,
78
    *,
79
    local_host: IPAddressType | None = ...,
80
    tls: Literal[True],
81
    ssl_context: ssl.SSLContext | None = ...,
82
    tls_standard_compatible: bool = ...,
83
    tls_hostname: str | None = ...,
84
    happy_eyeballs_delay: float = ...,
85
) -> TLSStream: ...
86

87

88
# tls=False
89
@overload
11✔
90
async def connect_tcp(
11✔
91
    remote_host: IPAddressType,
92
    remote_port: int,
93
    *,
94
    local_host: IPAddressType | None = ...,
95
    tls: Literal[False],
96
    ssl_context: ssl.SSLContext | None = ...,
97
    tls_standard_compatible: bool = ...,
98
    tls_hostname: str | None = ...,
99
    happy_eyeballs_delay: float = ...,
100
) -> SocketStream: ...
101

102

103
# No TLS arguments
104
@overload
11✔
105
async def connect_tcp(
11✔
106
    remote_host: IPAddressType,
107
    remote_port: int,
108
    *,
109
    local_host: IPAddressType | None = ...,
110
    happy_eyeballs_delay: float = ...,
111
) -> SocketStream: ...
112

113

114
async def connect_tcp(
11✔
115
    remote_host: IPAddressType,
116
    remote_port: int,
117
    *,
118
    local_host: IPAddressType | None = None,
119
    tls: bool = False,
120
    ssl_context: ssl.SSLContext | None = None,
121
    tls_standard_compatible: bool = True,
122
    tls_hostname: str | None = None,
123
    happy_eyeballs_delay: float = 0.25,
124
) -> SocketStream | TLSStream:
125
    """
126
    Connect to a host using the TCP protocol.
127

128
    This function implements the stateless version of the Happy Eyeballs algorithm (RFC
129
    6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
130
    each one is tried until one connection attempt succeeds. If the first attempt does
131
    not connected within 250 milliseconds, a second attempt is started using the next
132
    address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
133
    available) is tried first.
134

135
    When the connection has been established, a TLS handshake will be done if either
136
    ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
137

138
    :param remote_host: the IP address or host name to connect to
139
    :param remote_port: port on the target host to connect to
140
    :param local_host: the interface address or name to bind the socket to before
141
        connecting
142
    :param tls: ``True`` to do a TLS handshake with the connected stream and return a
143
        :class:`~anyio.streams.tls.TLSStream` instead
144
    :param ssl_context: the SSL context object to use (if omitted, a default context is
145
        created)
146
    :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
147
        before closing the stream and requires that the server does this as well.
148
        Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
149
        Some protocols, such as HTTP, require this option to be ``False``.
150
        See :meth:`~ssl.SSLContext.wrap_socket` for details.
151
    :param tls_hostname: host name to check the server certificate against (defaults to
152
        the value of ``remote_host``)
153
    :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
154
        attempt
155
    :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
156
    :raises OSError: if the connection attempt fails
157

158
    """
159
    # Placed here due to https://github.com/python/mypy/issues/7057
160
    connected_stream: SocketStream | None = None
11✔
161

162
    async def try_connect(remote_host: str, event: Event) -> None:
11✔
163
        nonlocal connected_stream
164
        try:
11✔
165
            stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
11✔
166
        except OSError as exc:
11✔
167
            oserrors.append(exc)
11✔
168
            return
11✔
169
        else:
170
            if connected_stream is None:
11✔
171
                connected_stream = stream
11✔
172
                tg.cancel_scope.cancel()
11✔
173
            else:
174
                await stream.aclose()
×
175
        finally:
176
            event.set()
11✔
177

178
    asynclib = get_async_backend()
11✔
179
    local_address: IPSockAddrType | None = None
11✔
180
    family = socket.AF_UNSPEC
11✔
181
    if local_host:
11✔
182
        gai_res = await getaddrinfo(str(local_host), None)
×
183
        family, *_, local_address = gai_res[0]
×
184

185
    target_host = str(remote_host)
11✔
186
    try:
11✔
187
        addr_obj = ip_address(remote_host)
11✔
188
    except ValueError:
11✔
189
        # getaddrinfo() will raise an exception if name resolution fails
190
        gai_res = await getaddrinfo(
11✔
191
            target_host, remote_port, family=family, type=socket.SOCK_STREAM
192
        )
193

194
        # Organize the list so that the first address is an IPv6 address (if available)
195
        # and the second one is an IPv4 addresses. The rest can be in whatever order.
196
        v6_found = v4_found = False
11✔
197
        target_addrs: list[tuple[socket.AddressFamily, str]] = []
11✔
198
        for af, *rest, sa in gai_res:
11✔
199
            if af == socket.AF_INET6 and not v6_found:
11✔
200
                v6_found = True
11✔
201
                target_addrs.insert(0, (af, sa[0]))
11✔
202
            elif af == socket.AF_INET and not v4_found and v6_found:
11✔
203
                v4_found = True
7✔
204
                target_addrs.insert(1, (af, sa[0]))
7✔
205
            else:
206
                target_addrs.append((af, sa[0]))
11✔
207
    else:
208
        if isinstance(addr_obj, IPv6Address):
11✔
209
            target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
11✔
210
        else:
211
            target_addrs = [(socket.AF_INET, addr_obj.compressed)]
11✔
212

213
    oserrors: list[OSError] = []
11✔
214
    async with create_task_group() as tg:
11✔
215
        for i, (af, addr) in enumerate(target_addrs):
11✔
216
            event = Event()
11✔
217
            tg.start_soon(try_connect, addr, event)
11✔
218
            with move_on_after(happy_eyeballs_delay):
11✔
219
                await event.wait()
11✔
220

221
    if connected_stream is None:
11✔
222
        cause = (
11✔
223
            oserrors[0]
224
            if len(oserrors) == 1
225
            else ExceptionGroup("multiple connection attempts failed", oserrors)
226
        )
227
        raise OSError("All connection attempts failed") from cause
11✔
228

229
    if tls or tls_hostname or ssl_context:
11✔
230
        try:
11✔
231
            return await TLSStream.wrap(
11✔
232
                connected_stream,
233
                server_side=False,
234
                hostname=tls_hostname or str(remote_host),
235
                ssl_context=ssl_context,
236
                standard_compatible=tls_standard_compatible,
237
            )
238
        except BaseException:
11✔
239
            await aclose_forcefully(connected_stream)
11✔
240
            raise
11✔
241

242
    return connected_stream
11✔
243

244

245
async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
11✔
246
    """
247
    Connect to the given UNIX socket.
248

249
    Not available on Windows.
250

251
    :param path: path to the socket
252
    :return: a socket stream object
253

254
    """
255
    path = os.fspath(path)
8✔
256
    return await get_async_backend().connect_unix(path)
8✔
257

258

259
async def create_tcp_listener(
11✔
260
    *,
261
    local_host: IPAddressType | None = None,
262
    local_port: int = 0,
263
    family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
264
    backlog: int = 65536,
265
    reuse_port: bool = False,
266
) -> MultiListener[SocketStream]:
267
    """
268
    Create a TCP socket listener.
269

270
    :param local_port: port number to listen on
271
    :param local_host: IP address of the interface to listen on. If omitted, listen on
272
        all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
273
        family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
274
    :param family: address family (used if ``local_host`` was omitted)
275
    :param backlog: maximum number of queued incoming connections (up to a maximum of
276
        2**16, or 65536)
277
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
278
        address/port (not supported on Windows)
279
    :return: a list of listener objects
280

281
    """
282
    asynclib = get_async_backend()
11✔
283
    backlog = min(backlog, 65536)
11✔
284
    local_host = str(local_host) if local_host is not None else None
11✔
285
    gai_res = await getaddrinfo(
11✔
286
        local_host,
287
        local_port,
288
        family=family,
289
        type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
290
        flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
291
    )
292
    listeners: list[SocketListener] = []
11✔
293
    try:
11✔
294
        # The set() is here to work around a glibc bug:
295
        # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
296
        sockaddr: tuple[str, int] | tuple[str, int, int, int]
297
        for fam, kind, *_, sockaddr in sorted(set(gai_res)):
11✔
298
            # Workaround for an uvloop bug where we don't get the correct scope ID for
299
            # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
300
            # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
301
            if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
11✔
302
                continue
9✔
303

304
            raw_socket = socket.socket(fam)
11✔
305
            raw_socket.setblocking(False)
11✔
306

307
            # For Windows, enable exclusive address use. For others, enable address
308
            # reuse.
309
            if sys.platform == "win32":
11✔
310
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
2✔
311
            else:
312
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
9✔
313

314
            if reuse_port:
11✔
315
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
9✔
316

317
            # If only IPv6 was requested, disable dual stack operation
318
            if fam == socket.AF_INET6:
11✔
319
                raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
11✔
320

321
                # Workaround for #554
322
                if "%" in sockaddr[0]:
11✔
323
                    addr, scope_id = sockaddr[0].split("%", 1)
8✔
324
                    sockaddr = (addr, sockaddr[1], 0, int(scope_id))
8✔
325

326
            raw_socket.bind(sockaddr)
11✔
327
            raw_socket.listen(backlog)
11✔
328
            listener = asynclib.create_tcp_listener(raw_socket)
11✔
329
            listeners.append(listener)
11✔
330
    except BaseException:
×
331
        for listener in listeners:
×
332
            await listener.aclose()
×
333

334
        raise
×
335

336
    return MultiListener(listeners)
11✔
337

338

339
async def create_unix_listener(
11✔
340
    path: str | bytes | PathLike[Any],
341
    *,
342
    mode: int | None = None,
343
    backlog: int = 65536,
344
) -> SocketListener:
345
    """
346
    Create a UNIX socket listener.
347

348
    Not available on Windows.
349

350
    :param path: path of the socket
351
    :param mode: permissions to set on the socket
352
    :param backlog: maximum number of queued incoming connections (up to a maximum of
353
        2**16, or 65536)
354
    :return: a listener object
355

356
    .. versionchanged:: 3.0
357
        If a socket already exists on the file system in the given path, it will be
358
        removed first.
359

360
    """
361
    backlog = min(backlog, 65536)
8✔
362
    raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
8✔
363
    try:
8✔
364
        raw_socket.listen(backlog)
8✔
365
        return get_async_backend().create_unix_listener(raw_socket)
8✔
366
    except BaseException:
×
367
        raw_socket.close()
×
368
        raise
×
369

370

371
async def create_udp_socket(
11✔
372
    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
373
    *,
374
    local_host: IPAddressType | None = None,
375
    local_port: int = 0,
376
    reuse_port: bool = False,
377
) -> UDPSocket:
378
    """
379
    Create a UDP socket.
380

381
    If ``port`` has been given, the socket will be bound to this port on the local
382
    machine, making this socket suitable for providing UDP based services.
383

384
    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
385
        determined from ``local_host`` if omitted
386
    :param local_host: IP address or host name of the local interface to bind to
387
    :param local_port: local port to bind to
388
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
389
        address/port (not supported on Windows)
390
    :return: a UDP socket
391

392
    """
393
    if family is AddressFamily.AF_UNSPEC and not local_host:
10✔
394
        raise ValueError('Either "family" or "local_host" must be given')
×
395

396
    if local_host:
10✔
397
        gai_res = await getaddrinfo(
10✔
398
            str(local_host),
399
            local_port,
400
            family=family,
401
            type=socket.SOCK_DGRAM,
402
            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
403
        )
404
        family = cast(AnyIPAddressFamily, gai_res[0][0])
10✔
405
        local_address = gai_res[0][-1]
10✔
406
    elif family is AddressFamily.AF_INET6:
10✔
407
        local_address = ("::", 0)
10✔
408
    else:
409
        local_address = ("0.0.0.0", 0)
10✔
410

411
    sock = await get_async_backend().create_udp_socket(
10✔
412
        family, local_address, None, reuse_port
413
    )
414
    return cast(UDPSocket, sock)
10✔
415

416

417
async def create_connected_udp_socket(
11✔
418
    remote_host: IPAddressType,
419
    remote_port: int,
420
    *,
421
    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
422
    local_host: IPAddressType | None = None,
423
    local_port: int = 0,
424
    reuse_port: bool = False,
425
) -> ConnectedUDPSocket:
426
    """
427
    Create a connected UDP socket.
428

429
    Connected UDP sockets can only communicate with the specified remote host/port, an
430
    any packets sent from other sources are dropped.
431

432
    :param remote_host: remote host to set as the default target
433
    :param remote_port: port on the remote host to set as the default target
434
    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
435
        determined from ``local_host`` or ``remote_host`` if omitted
436
    :param local_host: IP address or host name of the local interface to bind to
437
    :param local_port: local port to bind to
438
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
439
        address/port (not supported on Windows)
440
    :return: a connected UDP socket
441

442
    """
443
    local_address = None
10✔
444
    if local_host:
10✔
445
        gai_res = await getaddrinfo(
10✔
446
            str(local_host),
447
            local_port,
448
            family=family,
449
            type=socket.SOCK_DGRAM,
450
            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
451
        )
452
        family = cast(AnyIPAddressFamily, gai_res[0][0])
10✔
453
        local_address = gai_res[0][-1]
10✔
454

455
    gai_res = await getaddrinfo(
10✔
456
        str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
457
    )
458
    family = cast(AnyIPAddressFamily, gai_res[0][0])
10✔
459
    remote_address = gai_res[0][-1]
10✔
460

461
    sock = await get_async_backend().create_udp_socket(
10✔
462
        family, local_address, remote_address, reuse_port
463
    )
464
    return cast(ConnectedUDPSocket, sock)
10✔
465

466

467
async def create_unix_datagram_socket(
11✔
468
    *,
469
    local_path: None | str | bytes | PathLike[Any] = None,
470
    local_mode: int | None = None,
471
) -> UNIXDatagramSocket:
472
    """
473
    Create a UNIX datagram socket.
474

475
    Not available on Windows.
476

477
    If ``local_path`` has been given, the socket will be bound to this path, making this
478
    socket suitable for receiving datagrams from other processes. Other processes can
479
    send datagrams to this socket only if ``local_path`` is set.
480

481
    If a socket already exists on the file system in the ``local_path``, it will be
482
    removed first.
483

484
    :param local_path: the path on which to bind to
485
    :param local_mode: permissions to set on the local socket
486
    :return: a UNIX datagram socket
487

488
    """
489
    raw_socket = await setup_unix_local_socket(
8✔
490
        local_path, local_mode, socket.SOCK_DGRAM
491
    )
492
    return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
8✔
493

494

495
async def create_connected_unix_datagram_socket(
11✔
496
    remote_path: str | bytes | PathLike[Any],
497
    *,
498
    local_path: None | str | bytes | PathLike[Any] = None,
499
    local_mode: int | None = None,
500
) -> ConnectedUNIXDatagramSocket:
501
    """
502
    Create a connected UNIX datagram socket.
503

504
    Connected datagram sockets can only communicate with the specified remote path.
505

506
    If ``local_path`` has been given, the socket will be bound to this path, making
507
    this socket suitable for receiving datagrams from other processes. Other processes
508
    can send datagrams to this socket only if ``local_path`` is set.
509

510
    If a socket already exists on the file system in the ``local_path``, it will be
511
    removed first.
512

513
    :param remote_path: the path to set as the default target
514
    :param local_path: the path on which to bind to
515
    :param local_mode: permissions to set on the local socket
516
    :return: a connected UNIX datagram socket
517

518
    """
519
    remote_path = os.fspath(remote_path)
8✔
520
    raw_socket = await setup_unix_local_socket(
8✔
521
        local_path, local_mode, socket.SOCK_DGRAM
522
    )
523
    return await get_async_backend().create_unix_datagram_socket(
8✔
524
        raw_socket, remote_path
525
    )
526

527

528
async def getaddrinfo(
11✔
529
    host: bytes | str | None,
530
    port: str | int | None,
531
    *,
532
    family: int | AddressFamily = 0,
533
    type: int | SocketKind = 0,
534
    proto: int = 0,
535
    flags: int = 0,
536
) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
537
    """
538
    Look up a numeric IP address given a host name.
539

540
    Internationalized domain names are translated according to the (non-transitional)
541
    IDNA 2008 standard.
542

543
    .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
544
        (host, port), unlike what :func:`socket.getaddrinfo` does.
545

546
    :param host: host name
547
    :param port: port number
548
    :param family: socket family (`'AF_INET``, ...)
549
    :param type: socket type (``SOCK_STREAM``, ...)
550
    :param proto: protocol number
551
    :param flags: flags to pass to upstream ``getaddrinfo()``
552
    :return: list of tuples containing (family, type, proto, canonname, sockaddr)
553

554
    .. seealso:: :func:`socket.getaddrinfo`
555

556
    """
557
    # Handle unicode hostnames
558
    if isinstance(host, str):
11✔
559
        try:
11✔
560
            encoded_host: bytes | None = host.encode("ascii")
11✔
561
        except UnicodeEncodeError:
10✔
562
            import idna
10✔
563

564
            encoded_host = idna.encode(host, uts46=True)
10✔
565
    else:
566
        encoded_host = host
×
567

568
    gai_res = await get_async_backend().getaddrinfo(
11✔
569
        encoded_host, port, family=family, type=type, proto=proto, flags=flags
570
    )
571
    return [
11✔
572
        (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
573
        for family, type, proto, canonname, sockaddr in gai_res
574
    ]
575

576

577
def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
11✔
578
    """
579
    Look up the host name of an IP address.
580

581
    :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
582
    :param flags: flags to pass to upstream ``getnameinfo()``
583
    :return: a tuple of (host name, service name)
584

585
    .. seealso:: :func:`socket.getnameinfo`
586

587
    """
588
    return get_async_backend().getnameinfo(sockaddr, flags)
10✔
589

590

591
def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
11✔
592
    """
593
    Wait until the given socket has data to be read.
594

595
    This does **NOT** work on Windows when using the asyncio backend with a proactor
596
    event loop (default on py3.8+).
597

598
    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
599
        level constructs like socket streams!
600

601
    :param sock: a socket object
602
    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
603
        socket to become readable
604
    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
605
        to become readable
606

607
    """
608
    return get_async_backend().wait_socket_readable(sock)
×
609

610

611
def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
11✔
612
    """
613
    Wait until the given socket can be written to.
614

615
    This does **NOT** work on Windows when using the asyncio backend with a proactor
616
    event loop (default on py3.8+).
617

618
    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
619
        level constructs like socket streams!
620

621
    :param sock: a socket object
622
    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
623
        socket to become writable
624
    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
625
        to become writable
626

627
    """
628
    return get_async_backend().wait_socket_writable(sock)
×
629

630

631
#
632
# Private API
633
#
634

635

636
def convert_ipv6_sockaddr(
11✔
637
    sockaddr: tuple[str, int, int, int] | tuple[str, int],
638
) -> tuple[str, int]:
639
    """
640
    Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
641

642
    If the scope ID is nonzero, it is added to the address, separated with ``%``.
643
    Otherwise the flow id and scope id are simply cut off from the tuple.
644
    Any other kinds of socket addresses are returned as-is.
645

646
    :param sockaddr: the result of :meth:`~socket.socket.getsockname`
647
    :return: the converted socket address
648

649
    """
650
    # This is more complicated than it should be because of MyPy
651
    if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
11✔
652
        host, port, flowinfo, scope_id = sockaddr
11✔
653
        if scope_id:
11✔
654
            # PyPy (as of v7.3.11) leaves the interface name in the result, so
655
            # we discard it and only get the scope ID from the end
656
            # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
657
            host = host.split("%")[0]
8✔
658

659
            # Add scope_id to the address
660
            return f"{host}%{scope_id}", port
8✔
661
        else:
662
            return host, port
11✔
663
    else:
664
        return sockaddr
11✔
665

666

667
async def setup_unix_local_socket(
11✔
668
    path: None | str | bytes | PathLike[Any],
669
    mode: int | None,
670
    socktype: int,
671
) -> socket.socket:
672
    """
673
    Create a UNIX local socket object, deleting the socket at the given path if it
674
    exists.
675

676
    Not available on Windows.
677

678
    :param path: path of the socket
679
    :param mode: permissions to set on the socket
680
    :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
681

682
    """
683
    path_str: str | bytes | None
684
    if path is not None:
8✔
685
        path_str = os.fspath(path)
8✔
686
        is_abstract = (
8✔
687
            path_str.startswith(b"\0")
688
            if isinstance(path_str, bytes)
689
            else path_str.startswith("\0")
690
        )
691

692
        if is_abstract:
8✔
693
            # Unix abstract namespace socket. No file backing so skip stat call
694
            pass
6✔
695
        else:
696
            # Copied from pathlib...
697
            try:
8✔
698
                stat_result = os.stat(path)
8✔
699
            except OSError as e:
8✔
700
                if e.errno not in (
8✔
701
                    errno.ENOENT,
702
                    errno.ENOTDIR,
703
                    errno.EBADF,
704
                    errno.ELOOP,
705
                ):
NEW
706
                    raise
×
707
            else:
708
                if stat.S_ISSOCK(stat_result.st_mode):
8✔
709
                    os.unlink(path)
8✔
710
    else:
711
        path_str = None
8✔
712

713
    raw_socket = socket.socket(socket.AF_UNIX, socktype)
8✔
714
    raw_socket.setblocking(False)
8✔
715

716
    if path_str is not None:
8✔
717
        try:
8✔
718
            await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
8✔
719
            if mode is not None:
8✔
720
                await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
×
721
        except BaseException:
×
722
            raw_socket.close()
×
723
            raise
×
724

725
    return raw_socket
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc