• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #38177

14 Aug 2025 11:50AM UTC coverage: 78.503% (+0.7%) from 77.785%
#38177

push

local

web-flow
Narrow drive letter matching for `splitdrive` on Windows to one character, fixup docstring for `joinpath` (#58951)

Fixes #58929

Only a single letter followed by a colon is a valid drive prefix for
this format, any other length of string prior to a colon isn't a valid
drive in Windows:
https://learn.microsoft.com/en-us/dotnet/standard/io/file-path-formats

Also the docstring for `joinpath` rendered Windows paths as unescaped,
meaning they could not be copied and pasted on the REPL.

48594 of 61901 relevant lines covered (78.5%)

8669158.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.57
/stdlib/Sockets/src/Sockets.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Support for sockets. Provides [`IPAddr`](@ref) and subtypes, [`TCPSocket`](@ref), and [`UDPSocket`](@ref).
5
"""
6
module Sockets
7

8
export
9
    accept,
10
    bind,
11
    connect,
12
    getaddrinfo,
13
    getalladdrinfo,
14
    getnameinfo,
15
    getipaddr,
16
    getipaddrs,
17
    islinklocaladdr,
18
    getpeername,
19
    getsockname,
20
    listen,
21
    listenany,
22
    recv,
23
    recvfrom,
24
    send,
25
    join_multicast_group,
26
    leave_multicast_group,
27
    TCPSocket,
28
    UDPSocket,
29
    @ip_str,
30
    IPAddr,
31
    IPv4,
32
    IPv6
33

34
import Base: isless, show, print, parse, bind, alloc_buf_hook, _uv_hook_close
35

36
using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize,
37
    notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError,
38
    eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive,
39
    preserve_handle, unpreserve_handle, iolock_begin, iolock_end,
40
    uv_status_string, check_open, OS_HANDLE, RawFD,
41
    UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL,
42
    UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS,
43
    UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL,
44
    UV_EAI_FAMILY, UV_EAI_NODATA, UV_EAI_NONAME,
45
    UV_EAI_OVERFLOW, UV_EAI_PROTOCOL, UV_EAI_SERVICE,
46
    UV_EAI_SOCKTYPE, UV_EAI_MEMORY
47

48
include("IPAddr.jl")
49
include("addrinfo.jl")
50

51
"""
52
    TCPSocket(; delay=true)
53

54
Open a TCP socket using libuv. If `delay` is true, libuv delays creation of the
55
socket's file descriptor till the first [`bind`](@ref) call. `TCPSocket` has various
56
fields to denote the state of the socket as well as its send/receive buffers.
57
"""
58
mutable struct TCPSocket <: LibuvStream
59
    handle::Ptr{Cvoid}
60
    status::Int
61
    buffer::IOBuffer
62
    cond::Base.ThreadSynchronizer
63
    readerror::Any
64
    sendbuf::Union{IOBuffer, Nothing}
65
    lock::ReentrantLock # advisory lock
66
    throttle::Int
67

68
    function TCPSocket(handle::Ptr{Cvoid}, status)
1✔
69
        tcp = new(
244✔
70
                handle,
71
                status,
72
                PipeBuffer(),
73
                Base.ThreadSynchronizer(),
74
                nothing,
75
                nothing,
76
                ReentrantLock(),
77
                Base.DEFAULT_READ_BUFFER_SZ)
78
        associate_julia_struct(tcp.handle, tcp)
244✔
79
        finalizer(uvfinalize, tcp)
244✔
80
        return tcp
244✔
81
    end
82
end
83

84
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
85
function TCPSocket(; delay=true)
2,392✔
86
    tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
243✔
87
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
243✔
88
    iolock_begin()
243✔
89
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
243✔
90
                eventloop(), tcp.handle, af_spec)
91
    uv_error("failed to create tcp socket", err)
243✔
92
    tcp.status = StatusInit
243✔
93
    iolock_end()
243✔
94
    return tcp
243✔
95
end
96

97
function TCPSocket(fd::OS_HANDLE)
98
    tcp = TCPSocket()
1✔
99
    iolock_begin()
1✔
100
    err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), tcp.handle, fd)
1✔
101
    uv_error("tcp_open", err)
1✔
102
    tcp.status = StatusOpen
1✔
103
    iolock_end()
1✔
104
    return tcp
1✔
105
end
106
if OS_HANDLE != RawFD
107
    TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd))
×
108
end
109

110
Base.fd(sock::TCPSocket) = Base._fd(sock)
×
111

112

113
mutable struct TCPServer <: LibuvServer
114
    handle::Ptr{Cvoid}
115
    status::Int
116
    cond::Base.ThreadSynchronizer
117

118
    function TCPServer(handle::Ptr{Cvoid}, status)
119
        tcp = new(
4✔
120
            handle,
121
            status,
122
            Base.ThreadSynchronizer())
123
        associate_julia_struct(tcp.handle, tcp)
4✔
124
        finalizer(uvfinalize, tcp)
4✔
125
        return tcp
4✔
126
    end
127
end
128

129
# Keyword arg "delay": if true, libuv delays creation of socket fd till bind.
130
# It can be set to false if there is a need to set socket options before
131
# further calls to `bind` and `listen`, e.g. `SO_REUSEPORT`.
132
function TCPServer(; delay=true)
913✔
133
    tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
4✔
134
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
4✔
135
    iolock_begin()
4✔
136
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
4✔
137
                eventloop(), tcp.handle, af_spec)
138
    uv_error("failed to create tcp server", err)
4✔
139
    tcp.status = StatusInit
4✔
140
    iolock_end()
4✔
141
    return tcp
4✔
142
end
143

144
Base.fd(server::TCPServer) = Base._fd(server)
×
145

146
"""
147
    accept(server[, client])
148

149
Accepts a connection on the given server and returns a connection to the client. An
150
uninitialized client stream may be provided, in which case it will be used instead of
151
creating a new stream.
152
"""
153
accept(server::TCPServer) = accept(server, TCPSocket())
1,188✔
154

155
function accept(callback, server::LibuvServer)
156
    task = @async try
2✔
157
            while true
4✔
158
                client = accept(server)
4✔
159
                callback(client)
3✔
160
            end
3✔
161
        catch ex
162
            # accept below may explicitly throw UV_ECONNABORTED:
163
            # filter that out since we expect that error
164
            if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server)
2✔
165
                rethrow()
1✔
166
            end
167
        end
168
    return task # caller is responsible for checking for errors
1✔
169
end
170

171

172
# UDP
173
"""
174
    UDPSocket()
175

176
Open a UDP socket using libuv. `UDPSocket` has various
177
fields to denote the state of the socket.
178
"""
179
mutable struct UDPSocket <: LibuvStream
180
    handle::Ptr{Cvoid}
181
    status::Int
182
    recvnotify::Base.ThreadSynchronizer
183
    cond::Base.ThreadSynchronizer
184

185
    function UDPSocket(handle::Ptr{Cvoid}, status)
×
186
        cond = Base.ThreadSynchronizer()
×
187
        udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond)
×
188
        associate_julia_struct(udp.handle, udp)
×
189
        finalizer(uvfinalize, udp)
×
190
        return udp
×
191
    end
192
end
193
function UDPSocket()
×
194
    this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit)
×
195
    iolock_begin()
×
196
    err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
×
197
                eventloop(), this.handle)
198
    uv_error("failed to create udp socket", err)
×
199
    this.status = StatusInit
×
200
    iolock_end()
×
201
    return this
×
202
end
203

204
show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")
1✔
205

206
Base.fd(sock::UDPSocket) = Base._fd(sock)
×
207

208
function _uv_hook_close(sock::UDPSocket)
×
209
    lock(sock.cond)
×
210
    try
×
211
        sock.status = StatusClosed
×
212
        notify(sock.cond)
×
213
        notify_error(sock.recvnotify, EOFError())
×
214
    finally
215
        unlock(sock.cond)
×
216
    end
217
    nothing
×
218
end
219

220
# Disables dual stack mode.
221
const UV_TCP_IPV6ONLY = 1
222

223
# Disables dual stack mode. Only available when using ipv6 bind
224
const UV_UDP_IPV6ONLY = 1
225

226
# Indicates message was truncated because read buffer was too small. The
227
# remainder was discarded by the OS.
228
const UV_UDP_PARTIAL = 2
229

230
# Indicates if SO_REUSEADDR will be set when binding the handle in uv_udp_bind. This sets
231
# the SO_REUSEPORT socket flag on the BSDs and OS X. On other Unix platforms, it sets the
232
# SO_REUSEADDR flag. What that means is that multiple threads or processes can bind to the
233
# same address without error (provided they all set the flag) but only the last one to bind
234
# will receive any traffic, in effect "stealing" the port from the previous listener.
235
const UV_UDP_REUSEADDR = 4
236

237
##
238

239
function _bind(sock::Union{TCPServer, TCPSocket}, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
240
    host_in = Ref(hton(host.host))
1,217✔
241
    return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
1,217✔
242
            sock, hton(port), host_in, flags, host isa IPv6)
243
end
244

245
function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
246
    host_in = Ref(hton(host.host))
11✔
247
    return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
11✔
248
            sock, hton(port), host_in, flags, host isa IPv6)
249
end
250

251
"""
252
    bind(socket::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...)
253

254
Bind `socket` to the given `host:port`. Note that `0.0.0.0` will listen on all devices.
255

256
* The `ipv6only` parameter disables dual stack mode. If `ipv6only=true`, only an IPv6 stack is created.
257
* If `reuseaddr=true`, multiple threads or processes can bind to the same address without error
258
  if they all set `reuseaddr=true`, but only the last to bind will receive any traffic.
259
"""
260
function bind(sock::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only = false, reuseaddr = false, kws...)
2,456✔
261
    if sock.status != StatusInit
1,228✔
262
        error("$(typeof(sock)) is not in initialization state")
×
263
    end
264
    flags = 0
1,228✔
265
    if isa(host, IPv6) && ipv6only
1,228✔
266
        flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY
×
267
    end
268
    if isa(sock, UDPSocket) && reuseaddr
1,228✔
269
        flags |= UV_UDP_REUSEADDR
6✔
270
    end
271
    iolock_begin()
1,228✔
272
    err = _bind(sock, host, UInt16(port), UInt32(flags))
1,228✔
273
    if err < 0
1,228✔
274
        iolock_end()
×
275
        if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL
×
276
            #TODO: this codepath is not currently tested
277
            throw(_UVError("bind", err))
×
278
        else
279
            return false
×
280
        end
281
    end
282
    if isa(sock, TCPServer) || isa(sock, UDPSocket)
1,228✔
283
        sock.status = StatusOpen
919✔
284
    end
285
    isa(sock, UDPSocket) && setopt(sock; kws...)
1,228✔
286
    iolock_end()
1,228✔
287
    return true
1,228✔
288
end
289

290
bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port)
908✔
291

292
"""
293
    setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
294

295
Set UDP socket options.
296

297
* `multicast_loop`: loopback for multicast packets (default: `true`).
298
* `multicast_ttl`: TTL for multicast packets (default: `nothing`).
299
* `enable_broadcast`: flag must be set to `true` if socket will be used for broadcast
300
  messages, or else the UDP system will return an access error (default: `false`).
301
* `ttl`: Time-to-live of packets sent on the socket (default: `nothing`).
302
"""
303
function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
11✔
304
    iolock_begin()
11✔
305
    if sock.status == StatusUninit
11✔
306
        error("Cannot set options on uninitialized socket")
×
307
    end
308
    if multicast_loop !== nothing
6✔
309
        uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0)
×
310
    end
311
    if multicast_ttl !== nothing
6✔
312
        uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl))
×
313
    end
314
    if enable_broadcast !== nothing
6✔
315
        uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast))
6✔
316
    end
317
    if ttl !== nothing
6✔
318
        uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl))
×
319
    end
320
    iolock_end()
11✔
321
    nothing
6✔
322
end
323

324
"""
325
    recv(socket::UDPSocket)
326

327
Read a UDP packet from the specified socket, and return the bytes received. This call blocks.
328
"""
329
function recv(sock::UDPSocket)
330
    addr, data = recvfrom(sock)
38✔
331
    return data
×
332
end
333

334
function uv_recvcb end
335

336
"""
337
    recvfrom(socket::UDPSocket) -> (host_port, data)
338

339
Read a UDP packet from the specified socket, returning a tuple of `(host_port, data)`, where
340
`host_port` will be an InetAddr{IPv4} or InetAddr{IPv6}, as appropriate.
341

342
!!! compat "Julia 1.3"
343
    Prior to Julia version 1.3, the first returned value was an address (`IPAddr`).
344
    In version 1.3 it was changed to an `InetAddr`.
345
"""
346
function recvfrom(sock::UDPSocket)
×
347
    iolock_begin()
×
348
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
349
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
350
        error("UDPSocket is not initialized and open")
×
351
    end
352
    if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
×
353
        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
×
354
                    sock,
355
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
×
356
                    @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
×
357
        uv_error("recv_start", err)
×
358
    end
359
    sock.status = StatusActive
×
360
    lock(sock.recvnotify)
×
361
    iolock_end()
×
362
    try
×
363
        From = Union{InetAddr{IPv4}, InetAddr{IPv6}}
×
364
        Data = Vector{UInt8}
×
365
        from, data = wait(sock.recvnotify)::Tuple{From, Data}
×
366
        return (from, data)
×
367
    finally
368
        unlock(sock.recvnotify)
×
369
    end
370
end
371

372
alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), Int(size)) # size is always 64k from libuv
×
373

374
function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
×
375
    sock = @handle_as handle UDPSocket
×
376
    lock(sock.recvnotify)
×
377
    try
×
378
        buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
×
379
        if nread == 0 && addr == C_NULL
×
380
            Libc.free(buf_addr)
×
381
        elseif nread < 0
×
382
            Libc.free(buf_addr)
×
383
            notify_error(sock.recvnotify, _UVError("recv", nread))
×
384
        elseif flags & UV_UDP_PARTIAL > 0
×
385
            Libc.free(buf_addr)
×
386
            notify_error(sock.recvnotify, "Partial message received")
×
387
        else
388
            buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
×
389
            if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
×
390
                buf = unsafe_wrap(Array, buf_addr, nread, own=true)
×
391
            else
392
                buf = Vector{UInt8}(undef, nread)
×
393
                GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread)
×
394
                Libc.free(buf_addr)
×
395
            end
396
            # need to check the address type in order to convert to a Julia IPAddr
397
            host = IPv4(0)
×
398
            port = UInt16(0)
×
399
            if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
×
400
                host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
×
401
                port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
×
402
            elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
×
403
                tmp = Ref{UInt128}(0)
×
404
                scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
×
405
                host = IPv6(ntoh(tmp[]))
×
406
                port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
×
407
            end
408
            from = InetAddr(host, port)
×
409
            notify(sock.recvnotify, (from, buf), all=false)
×
410
        end
411
        if sock.status == StatusActive && isempty(sock.recvnotify)
×
412
            sock.status = StatusOpen
×
413
            ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
×
414
        end
415
    finally
416
        unlock(sock.recvnotify)
×
417
    end
418
    nothing
×
419
end
420

421
function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf)
40✔
422
    req = Libc.malloc(Base._sizeof_uv_udp_send)
40✔
423
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
40✔
424
    host_in = Ref(hton(ipaddr.host))
40✔
425
    err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint),
40✔
426
                req, sock, hton(port), host_in, buf, sizeof(buf),
427
                @cfunction(Base.uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)),
428
                ipaddr isa IPv6)
429
    if err < 0
40✔
430
        Libc.free(req)
×
431
        uv_error("send", err)
×
432
    end
433
    return req
40✔
434
end
435

436
"""
437
    send(socket::UDPSocket, host::IPAddr, port::Integer, msg)
438

439
Send `msg` over `socket` to `host:port`.
440
"""
441
function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
40✔
442
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
443
    iolock_begin()
40✔
444
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
40✔
445
        error("UDPSocket is not initialized and open")
×
446
    end
447
    uvw = _send_async(sock, ipaddr, UInt16(port), msg)
40✔
448
    ct = current_task()
40✔
449
    preserve_handle(ct)
40✔
450
    Base.sigatomic_begin()
40✔
451
    uv_req_set_data(uvw, ct)
40✔
452
    iolock_end()
40✔
453
    status = try
40✔
454
        Base.sigatomic_end()
40✔
455
        wait()::Cint
40✔
456
    finally
457
        Base.sigatomic_end()
40✔
458
        iolock_begin()
40✔
459
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
40✔
460
        if uv_req_data(uvw) != C_NULL
40✔
461
            # uvw is still alive,
462
            # so make sure we won't get spurious notifications later
463
            uv_req_set_data(uvw, C_NULL)
×
464
        else
465
            # done with uvw
466
            Libc.free(uvw)
40✔
467
        end
468
        iolock_end()
40✔
469
        unpreserve_handle(ct)
40✔
470
    end
471
    uv_error("send", status)
40✔
472
    nothing
39✔
473
end
474

475

476
#from `connect`
477
function uv_connectcb(conn::Ptr{Cvoid}, status::Cint)
235✔
478
    hand = ccall(:jl_uv_connect_handle, Ptr{Cvoid}, (Ptr{Cvoid},), conn)
235✔
479
    sock = @handle_as hand LibuvStream
235✔
480
    lock(sock.cond)
470✔
481
    try
235✔
482
        if status >= 0 # success
235✔
483
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
470✔
484
                sock.status = StatusOpen
235✔
485
            end
486
        else
487
            sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this
×
488
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
×
489
                ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand)
×
490
                sock.status = StatusClosing
×
491
            end
492
        end
493
        notify(sock.cond)
235✔
494
    finally
495
        unlock(sock.cond)
470✔
496
    end
497
    Libc.free(conn)
235✔
498
    nothing
235✔
499
end
500

501
function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer)
960✔
502
    iolock_begin()
960✔
503
    if sock.status != StatusInit
960✔
504
        error("TCPSocket is not in initialization state")
×
505
    end
506
    if !(0 <= port <= typemax(UInt16))
960✔
507
        throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port"))
4✔
508
    end
509
    host_in = Ref(hton(host.host))
956✔
510
    uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint),
956✔
511
                              sock, host_in, hton(UInt16(port)), @cfunction(uv_connectcb, Cvoid, (Ptr{Cvoid}, Cint)),
512
                              host isa IPv6))
513
    sock.status = StatusConnecting
956✔
514
    iolock_end()
956✔
515
    nothing
956✔
516
end
517

518
connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port)
6✔
519

520
function wait_connected(x::LibuvStream)
2,201✔
521
    iolock_begin()
2,201✔
522
    check_open(x)
2,201✔
523
    isopen(x) || x.readerror === nothing || throw(x.readerror)
2,201✔
524
    preserve_handle(x)
2,201✔
525
    lock(x.cond)
2,201✔
526
    try
2,201✔
527
        while x.status == StatusConnecting
3,786✔
528
            iolock_end()
1,585✔
529
            wait(x.cond)
1,585✔
530
            unlock(x.cond)
1,585✔
531
            iolock_begin()
1,585✔
532
            lock(x.cond)
1,585✔
533
        end
1,585✔
534
        isopen(x) || x.readerror === nothing || throw(x.readerror)
2,205✔
535
    finally
536
        unlock(x.cond)
2,201✔
537
        unpreserve_handle(x)
2,201✔
538
    end
539
    iolock_end()
2,197✔
540
    nothing
2,197✔
541
end
542

543
# Default Host to localhost
544

545
"""
546
    connect([host], port::Integer) -> TCPSocket
547

548
Connect to the host `host` on port `port`.
549
"""
550
connect(sock::TCPSocket, port::Integer) = connect(sock, localhost, port)
×
551
connect(port::Integer) = connect(localhost, port)
634✔
552

553
# Valid connect signatures for TCP
554
connect(host::AbstractString, port::Integer) = connect(TCPSocket(), host, port)
1✔
555
connect(addr::IPAddr, port::Integer) = connect(TCPSocket(), addr, port)
645✔
556
connect(addr::InetAddr) = connect(TCPSocket(), addr)
4✔
557

558
function connect!(sock::TCPSocket, host::AbstractString, port::Integer)
559
    if sock.status != StatusInit
1✔
560
        error("TCPSocket is not in initialization state")
×
561
    end
562
    ipaddr = getaddrinfo(host)
1✔
563
    connect!(sock, ipaddr, port)
×
564
    return sock
×
565
end
566

567
function connect(sock::LibuvStream, args...)
568
    connect!(sock, args...)
1,589✔
569
    wait_connected(sock)
1,584✔
570
    return sock
1,581✔
571
end
572

573
"""
574
    nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool)
575

576
Nagle's algorithm batches multiple small TCP packets into larger
577
ones. This can improve throughput but worsen latency. Nagle's algorithm
578
is enabled by default. This function sets whether Nagle's algorithm is
579
active on a given TCP server or socket. The opposite option is called
580
`TCP_NODELAY` in other languages.
581

582
!!! compat "Julia 1.3"
583
    This function requires Julia 1.3 or later.
584
"""
585
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
4✔
586
    # disable or enable Nagle's algorithm on all OSes
587
    iolock_begin()
620✔
588
    check_open(sock)
620✔
589
    err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
620✔
590
    # TODO: check err
591
    iolock_end()
620✔
592
    return err
620✔
593
end
594

595
"""
596
    quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool)
597

598
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
599
"""
600
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
243✔
601
    iolock_begin()
243✔
602
    check_open(sock)
243✔
603
    @static if Sys.islinux()
604
        # tcp_quickack is a linux only option
605
        if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
243✔
606
            @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
×
607
        end
608
    end
609
    iolock_end()
243✔
610
    nothing
243✔
611
end
612

613

614
##
615

616
const BACKLOG_DEFAULT = 511
617

618
"""
619
    listen([addr, ]port::Integer; backlog::Integer=BACKLOG_DEFAULT) -> TCPServer
620

621
Listen on port on the address specified by `addr`.
622
By default this listens on `localhost` only.
623
To listen on all interfaces pass `IPv4(0)` or `IPv6(0)` as appropriate.
624
`backlog` determines how many connections can be pending (not having
625
called [`accept`](@ref)) before the server will begin to
626
reject them. The default value of `backlog` is 511.
627
"""
628
function listen(addr; backlog::Integer=BACKLOG_DEFAULT)
14✔
629
    sock = TCPServer()
7✔
630
    bind(sock, addr) || error("cannot bind to port; may already be in use or access denied")
7✔
631
    listen(sock; backlog=backlog)
7✔
632
    return sock
6✔
633
end
634
listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog)
2✔
635
listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog)
4✔
636

637
function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
634✔
638
    uv_error("listen", trylisten(sock; backlog=backlog))
634✔
639
    return sock
633✔
640
end
641

642
# from `listen`
643
function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint)
4✔
644
    sock = @handle_as stream LibuvServer
4✔
645
    lock(sock.cond)
8✔
646
    try
4✔
647
        if status >= 0
4✔
648
            notify(sock.cond)
4✔
649
        else
650
            notify_error(sock.cond, _UVError("connection", status))
4✔
651
        end
652
    finally
653
        unlock(sock.cond)
8✔
654
    end
655
    nothing
4✔
656
end
657

658
function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
1,535✔
659
    iolock_begin()
1,535✔
660
    check_open(sock)
1,535✔
661
    err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}),
1,535✔
662
                sock, backlog, @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)))
663
    sock.status = StatusActive
1,535✔
664
    iolock_end()
1,535✔
665
    return err
1,535✔
666
end
667

668
##
669

670
function accept_nonblock(server::TCPServer, client::TCPSocket)
671
    iolock_begin()
12✔
672
    if client.status != StatusInit
12✔
673
        error("client TCPSocket is not in initialization state")
×
674
    end
675
    err = ccall(:uv_accept, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), server.handle, client.handle)
12✔
676
    if err == 0
12✔
677
        client.status = StatusOpen
4✔
678
    end
679
    iolock_end()
12✔
680
    return err
12✔
681
end
682

683
function accept_nonblock(server::TCPServer)
×
684
    client = TCPSocket()
×
685
    uv_error("accept", accept_nonblock(server, client))
×
686
    return client
×
687
end
688

689
function accept(server::LibuvServer, client::LibuvStream)
8✔
690
    iolock_begin()
8✔
691
    if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
8✔
692
        throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
×
693
    end
694
    while isopen(server)
12✔
695
        err = accept_nonblock(server, client)
12✔
696
        if err == 0
12✔
697
            iolock_end()
4✔
698
            return client
4✔
699
        elseif err != UV_EAGAIN
8✔
700
            uv_error("accept", err)
×
701
        end
702
        preserve_handle(server)
8✔
703
        lock(server.cond)
8✔
704
        iolock_end()
8✔
705
        try
8✔
706
            wait(server.cond)
8✔
707
        finally
708
            unlock(server.cond)
4✔
709
            unpreserve_handle(server)
4✔
710
        end
711
        iolock_begin()
4✔
712
    end
4✔
713
    uv_error("accept", UV_ECONNABORTED)
×
714
    nothing
×
715
end
716

717
## Utility functions
718

719
const localhost = ip"127.0.0.1"
720

721
"""
722
    listenany([host::IPAddr,] port_hint; backlog::Integer=BACKLOG_DEFAULT) -> (UInt16, TCPServer)
723

724
Create a `TCPServer` on any port, using hint as a starting point. Returns a tuple of the
725
actual port that the server was created on and the server itself.
726
The backlog argument defines the maximum length to which the queue of pending connections for sockfd may grow.
727
"""
728
function listenany(host::IPAddr, default_port; backlog::Integer=BACKLOG_DEFAULT)
1,758✔
729
    addr = InetAddr(host, default_port)
879✔
730
    while true
901✔
731
        sock = TCPServer()
901✔
732
        if bind(sock, addr) && trylisten(sock; backlog) == 0
901✔
733
            if default_port == 0
879✔
734
                _addr, port = getsockname(sock)
2✔
735
                return (port, sock)
1✔
736
            end
737
            return (addr.port, sock)
878✔
738
        end
739
        close(sock)
22✔
740
        addr = InetAddr(addr.host, addr.port + UInt16(1))
22✔
741
        if addr.port == default_port
22✔
742
            error("no ports available")
×
743
        end
744
    end
22✔
745
end
746

747
listenany(default_port; backlog::Integer=BACKLOG_DEFAULT) = listenany(localhost, default_port; backlog)
1,264✔
748

749
function udp_set_membership(sock::UDPSocket, group_addr::String,
750
                            interface_addr::Union{Nothing, String}, operation)
751
    if interface_addr === nothing
×
752
        interface_addr = C_NULL
×
753
    end
754
    r = ccall(:uv_udp_set_membership, Cint,
4✔
755
              (Ptr{Cvoid}, Cstring, Cstring, Cint),
756
              sock.handle, group_addr, interface_addr, operation)
757
    uv_error("uv_udp_set_membership", r)
4✔
758
    return
4✔
759
end
760

761
"""
762
    join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
763

764
Join a socket to a particular multicast group defined by `group_addr`.
765
If `interface_addr` is given, specifies a particular interface for multi-homed
766
systems.  Use `leave_multicast_group()` to disable reception of a group.
767
"""
768
function join_multicast_group(sock::UDPSocket, group_addr::String,
769
                              interface_addr::Union{Nothing, String} = nothing)
770
    return udp_set_membership(sock, group_addr, interface_addr, 1)
2✔
771
end
772
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
4✔
773
                              interface_addr::Union{Nothing, IPAddr} = nothing)
774
    if interface_addr !== nothing
4✔
775
        interface_addr = string(interface_addr)
×
776
    end
777
    return join_multicast_group(sock, string(group_addr), interface_addr)
2✔
778
end
779

780
"""
781
    leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
782

783
Remove a socket from a particular multicast group defined by `group_addr`.
784
If `interface_addr` is given, specifies a particular interface for multi-homed
785
systems.  Use `join_multicast_group()` to enable reception of a group.
786
"""
787
function leave_multicast_group(sock::UDPSocket, group_addr::String,
788
                               interface_addr::Union{Nothing, String} = nothing)
789
    return udp_set_membership(sock, group_addr, interface_addr, 0)
2✔
790
end
791
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
4✔
792
                               interface_addr::Union{Nothing, IPAddr} = nothing)
793
    if interface_addr !== nothing
4✔
794
        interface_addr = string(interface_addr)
×
795
    end
796
    return leave_multicast_group(sock, string(group_addr), interface_addr)
2✔
797
end
798

799
"""
800
    getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
801

802
Get the IP address and port that the given socket is bound to.
803
"""
804
getsockname(sock::Union{TCPSocket, TCPServer}) = _sockname(sock, true)
646✔
805

806

807
"""
808
    getpeername(sock::TCPSocket) -> (IPAddr, UInt16)
809

810
Get the IP address and port of the remote endpoint that the given
811
socket is connected to. Valid only for connected TCP sockets.
812
"""
813
getpeername(sock::TCPSocket) = _sockname(sock, false)
22✔
814

815
function _sockname(sock, self=true)
20✔
816
    sock.status == StatusInit || check_open(sock)
403✔
817
    rport = Ref{Cushort}(0)
356✔
818
    raddress = zeros(UInt8, 16)
356✔
819
    rfamily = Ref{Cuint}(0)
356✔
820

821
    iolock_begin()
356✔
822
    if self
255✔
823
        r = ccall(:jl_tcp_getsockname, Int32,
344✔
824
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
825
                sock.handle, rport, raddress, rfamily)
826
    else
827
        r = ccall(:jl_tcp_getpeername, Int32,
12✔
828
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
829
                sock.handle, rport, raddress, rfamily)
830
    end
831
    iolock_end()
356✔
832
    uv_error("cannot obtain socket name", r)
356✔
833
    port = ntoh(rport[])
356✔
834
    af_inet6 = @static if Sys.iswindows() # AF_INET6 in <sys/socket.h>
835
        23
836
    elseif Sys.isapple()
837
        30
838
    elseif Sys.KERNEL ∈ (:FreeBSD, :DragonFly)
839
        28
840
    elseif Sys.KERNEL ∈ (:NetBSD, :OpenBSD)
841
        24
842
    else
843
        10
239✔
844
    end
845

846
    if rfamily[] == 2 # AF_INET
356✔
847
        addrv4 = raddress[1:4]
704✔
848
        naddr = ntoh(unsafe_load(Ptr{Cuint}(pointer(addrv4)), 1))
277✔
849
        addr = IPv4(naddr)
277✔
850
    elseif rfamily[] == af_inet6
4✔
851
        naddr = ntoh(unsafe_load(Ptr{UInt128}(pointer(raddress)), 1))
4✔
852
        addr = IPv6(naddr)
4✔
853
    else
854
        error(string("unsupported address family: ", rfamily[]))
×
855
    end
856
    return addr, port
356✔
857
end
858

859
# domain sockets
860

861
include("PipeServer.jl")
862

863
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc