• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / 1391

29 Dec 2025 07:41PM UTC coverage: 76.638% (+0.02%) from 76.621%
1391

push

buildkite

web-flow
🤖 Bump StyledStrings stdlib 9bb8ffd → a033d46 (#60503)

Co-authored-by: KristofferC <1282691+KristofferC@users.noreply.github.com>

62409 of 81433 relevant lines covered (76.64%)

22974996.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.15
/stdlib/Sockets/src/Sockets.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Support for sockets. Provides [`IPAddr`](@ref) and subtypes, [`TCPSocket`](@ref), and [`UDPSocket`](@ref).
5
"""
6
module Sockets
7

8
export
9
    accept,
10
    bind,
11
    connect,
12
    getaddrinfo,
13
    getalladdrinfo,
14
    getnameinfo,
15
    getipaddr,
16
    getipaddrs,
17
    islinklocaladdr,
18
    getpeername,
19
    getsockname,
20
    listen,
21
    listenany,
22
    recv,
23
    recvfrom,
24
    send,
25
    join_multicast_group,
26
    leave_multicast_group,
27
    TCPSocket,
28
    UDPSocket,
29
    @ip_str,
30
    IPAddr,
31
    IPv4,
32
    IPv6
33

34
import Base: isless, show, print, parse, bind, alloc_buf_hook, _uv_hook_close
35

36
using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize,
37
    notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError,
38
    eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive,
39
    preserve_handle, unpreserve_handle, iolock_begin, iolock_end,
40
    uv_status_string, check_open, OS_HANDLE, RawFD,
41
    UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL,
42
    UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS,
43
    UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL,
44
    UV_EAI_FAMILY, UV_EAI_NODATA, UV_EAI_NONAME,
45
    UV_EAI_OVERFLOW, UV_EAI_PROTOCOL, UV_EAI_SERVICE,
46
    UV_EAI_SOCKTYPE, UV_EAI_MEMORY
47

48
include("IPAddr.jl")
49
include("addrinfo.jl")
50

51
"""
52
    TCPSocket(; delay=true)
53

54
Open a TCP socket using libuv. If `delay` is true, libuv delays creation of the
55
socket's file descriptor till the first [`bind`](@ref) call. `TCPSocket` has various
56
fields to denote the state of the socket as well as its send/receive buffers.
57
"""
58
mutable struct TCPSocket <: LibuvStream
59
    handle::Ptr{Cvoid}
60
    status::Int
61
    buffer::IOBuffer
62
    cond::Base.ThreadSynchronizer
63
    readerror::Any
64
    sendbuf::Union{IOBuffer, Nothing}
65
    lock::ReentrantLock # advisory lock
66
    throttle::Int
67

68
    function TCPSocket(handle::Ptr{Cvoid}, status)
1✔
69
        tcp = new(
374✔
70
                handle,
71
                status,
72
                PipeBuffer(),
73
                Base.ThreadSynchronizer(),
74
                nothing,
75
                nothing,
76
                ReentrantLock(),
77
                Base.DEFAULT_READ_BUFFER_SZ)
78
        associate_julia_struct(tcp.handle, tcp)
374✔
79
        finalizer(uvfinalize, tcp)
374✔
80
        return tcp
374✔
81
    end
82
end
83

84
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
85
function TCPSocket(; delay=true)
5,688✔
86
    tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
373✔
87
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
373✔
88
    iolock_begin()
373✔
89
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
373✔
90
                eventloop(), tcp.handle, af_spec)
91
    uv_error("failed to create tcp socket", err)
373✔
92
    tcp.status = StatusInit
373✔
93
    iolock_end()
373✔
94
    return tcp
373✔
95
end
96

97
function TCPSocket(fd::OS_HANDLE)
×
98
    tcp = TCPSocket()
×
99
    iolock_begin()
×
100
    err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), tcp.handle, fd)
×
101
    uv_error("tcp_open", err)
×
102
    tcp.status = StatusOpen
×
103
    iolock_end()
×
104
    return tcp
×
105
end
106
if OS_HANDLE != RawFD
107
    TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd))
×
108
end
109

110
Base.fd(sock::TCPSocket) = Base._fd(sock)
×
111

112

113
mutable struct TCPServer <: LibuvServer
114
    handle::Ptr{Cvoid}
115
    status::Int
116
    cond::Base.ThreadSynchronizer
117

118
    function TCPServer(handle::Ptr{Cvoid}, status)
119
        tcp = new(
12✔
120
            handle,
121
            status,
122
            Base.ThreadSynchronizer())
123
        associate_julia_struct(tcp.handle, tcp)
12✔
124
        finalizer(uvfinalize, tcp)
12✔
125
        return tcp
12✔
126
    end
127
end
128

129
# Keyword arg "delay": if true, libuv delays creation of socket fd till bind.
130
# It can be set to false if there is a need to set socket options before
131
# further calls to `bind` and `listen`, e.g. `SO_REUSEPORT`.
132
function TCPServer(; delay=true)
2,295✔
133
    tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
12✔
134
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
12✔
135
    iolock_begin()
12✔
136
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
12✔
137
                eventloop(), tcp.handle, af_spec)
138
    uv_error("failed to create tcp server", err)
12✔
139
    tcp.status = StatusInit
12✔
140
    iolock_end()
12✔
141
    return tcp
12✔
142
end
143

144
Base.fd(server::TCPServer) = Base._fd(server)
×
145

146
"""
147
    accept(server[, client])
148

149
Accepts a connection on the given server and returns a connection to the client. An
150
uninitialized client stream may be provided, in which case it will be used instead of
151
creating a new stream.
152
"""
153
accept(server::TCPServer) = accept(server, TCPSocket())
2,825✔
154

155
function accept(callback, server::LibuvServer)
156
    task = @async try
4✔
157
            while true
8✔
158
                client = accept(server)
8✔
159
                callback(client)
6✔
160
            end
6✔
161
        catch ex
162
            # accept below may explicitly throw UV_ECONNABORTED:
163
            # filter that out since we expect that error
164
            if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server)
4✔
165
                rethrow()
2✔
166
            end
167
        end
168
    return task # caller is responsible for checking for errors
2✔
169
end
170

171

172
# UDP
173
"""
174
    UDPSocket()
175

176
Open a UDP socket using libuv. `UDPSocket` has various
177
fields to denote the state of the socket.
178
"""
179
mutable struct UDPSocket <: LibuvStream
180
    handle::Ptr{Cvoid}
181
    status::Int
182
    recvnotify::Base.ThreadSynchronizer
183
    cond::Base.ThreadSynchronizer
184

185
    function UDPSocket(handle::Ptr{Cvoid}, status)
×
186
        cond = Base.ThreadSynchronizer()
×
187
        udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond)
×
188
        associate_julia_struct(udp.handle, udp)
×
189
        finalizer(uvfinalize, udp)
×
190
        return udp
×
191
    end
192
end
193
function UDPSocket()
×
194
    this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit)
×
195
    iolock_begin()
×
196
    err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
×
197
                eventloop(), this.handle)
198
    uv_error("failed to create udp socket", err)
×
199
    this.status = StatusInit
×
200
    iolock_end()
×
201
    return this
×
202
end
203

204
show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")
2✔
205

206
Base.fd(sock::UDPSocket) = Base._fd(sock)
×
207

208
function _uv_hook_close(sock::UDPSocket)
×
209
    lock(sock.cond)
×
210
    try
×
211
        sock.status = StatusClosed
×
212
        notify(sock.cond)
×
213
        notify_error(sock.recvnotify, EOFError())
×
214
    finally
215
        unlock(sock.cond)
×
216
    end
217
    nothing
×
218
end
219

220
# Disables dual stack mode.
221
const UV_TCP_IPV6ONLY = 1
222

223
# Disables dual stack mode. Only available when using ipv6 bind
224
const UV_UDP_IPV6ONLY = 1
225

226
# Indicates message was truncated because read buffer was too small. The
227
# remainder was discarded by the OS.
228
const UV_UDP_PARTIAL = 2
229

230
# Indicates if SO_REUSEADDR will be set when binding the handle in uv_udp_bind. This sets
231
# the SO_REUSEPORT socket flag on the BSDs and OS X. On other Unix platforms, it sets the
232
# SO_REUSEADDR flag. What that means is that multiple threads or processes can bind to the
233
# same address without error (provided they all set the flag) but only the last one to bind
234
# will receive any traffic, in effect "stealing" the port from the previous listener.
235
const UV_UDP_REUSEADDR = 4
236

237
##
238

239
function _bind(sock::Union{TCPServer, TCPSocket}, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
240
    host_in = Ref(hton(host.host))
2,479✔
241
    return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
2,479✔
242
            sock, hton(port), host_in, flags, host isa IPv6)
243
end
244

245
function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
246
    host_in = Ref(hton(host.host))
22✔
247
    return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
22✔
248
            sock, hton(port), host_in, flags, host isa IPv6)
249
end
250

251
"""
252
    bind(socket::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...)
253

254
Bind `socket` to the given `host:port`. Note that `0.0.0.0` will listen on all devices.
255

256
* The `ipv6only` parameter disables dual stack mode. If `ipv6only=true`, only an IPv6 stack is created.
257
* If `reuseaddr=true`, multiple threads or processes can bind to the same address without error
258
  if they all set `reuseaddr=true`, but only the last to bind will receive any traffic.
259
"""
260
function bind(sock::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only = false, reuseaddr = false, kws...)
5,002✔
261
    if sock.status != StatusInit
2,501✔
262
        error("$(typeof(sock)) is not in initialization state")
×
263
    end
264
    flags = 0
2,501✔
265
    if isa(host, IPv6) && ipv6only
2,501✔
266
        flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY
×
267
    end
268
    if isa(sock, UDPSocket) && reuseaddr
2,501✔
269
        flags |= UV_UDP_REUSEADDR
12✔
270
    end
271
    iolock_begin()
2,501✔
272
    err = _bind(sock, host, UInt16(port), UInt32(flags))
2,501✔
273
    if err < 0
2,501✔
274
        iolock_end()
×
275
        if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL
×
276
            #TODO: this codepath is not currently tested
277
            throw(_UVError("bind", err))
×
278
        else
279
            return false
×
280
        end
281
    end
282
    if isa(sock, TCPServer) || isa(sock, UDPSocket)
2,501✔
283
        sock.status = StatusOpen
2,303✔
284
    end
285
    isa(sock, UDPSocket) && setopt(sock; kws...)
2,501✔
286
    iolock_end()
2,501✔
287
    return true
2,501✔
288
end
289

290
bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port)
2,281✔
291

292
"""
293
    setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
294

295
Set UDP socket options.
296

297
* `multicast_loop`: loopback for multicast packets (default: `true`).
298
* `multicast_ttl`: TTL for multicast packets (default: `nothing`).
299
* `enable_broadcast`: flag must be set to `true` if socket will be used for broadcast
300
  messages, or else the UDP system will return an access error (default: `false`).
301
* `ttl`: Time-to-live of packets sent on the socket (default: `nothing`).
302
"""
303
function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
22✔
304
    iolock_begin()
22✔
305
    if sock.status == StatusUninit
22✔
306
        error("Cannot set options on uninitialized socket")
×
307
    end
308
    if multicast_loop !== nothing
12✔
309
        uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0)
×
310
    end
311
    if multicast_ttl !== nothing
12✔
312
        uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl))
×
313
    end
314
    if enable_broadcast !== nothing
12✔
315
        uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast))
12✔
316
    end
317
    if ttl !== nothing
12✔
318
        uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl))
×
319
    end
320
    iolock_end()
22✔
321
    nothing
12✔
322
end
323

324
"""
325
    recv(socket::UDPSocket)
326

327
Read a UDP packet from the specified socket, and return the bytes received. This call blocks.
328
"""
329
function recv(sock::UDPSocket)
330
    addr, data = recvfrom(sock)
70✔
331
    return data
×
332
end
333

334
function uv_recvcb end
335

336
"""
337
    recvfrom(socket::UDPSocket) -> (host_port, data)
338

339
Read a UDP packet from the specified socket, returning a tuple of `(host_port, data)`, where
340
`host_port` will be an InetAddr{IPv4} or InetAddr{IPv6}, as appropriate.
341

342
!!! compat "Julia 1.3"
343
    Prior to Julia version 1.3, the first returned value was an address (`IPAddr`).
344
    In version 1.3 it was changed to an `InetAddr`.
345
"""
346
function recvfrom(sock::UDPSocket)
×
347
    iolock_begin()
×
348
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
349
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
350
        error("UDPSocket is not initialized and open")
×
351
    end
352
    if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
×
353
        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
×
354
                    sock,
355
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
×
356
                    @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
×
357
        uv_error("recv_start", err)
×
358
    end
359
    sock.status = StatusActive
×
360
    lock(sock.recvnotify)
×
361
    iolock_end()
×
362
    try
×
363
        From = Union{InetAddr{IPv4}, InetAddr{IPv6}}
×
364
        Data = Vector{UInt8}
×
365
        from, data = wait(sock.recvnotify)::Tuple{From, Data}
×
366
        return (from, data)
×
367
    finally
368
        unlock(sock.recvnotify)
×
369
    end
370
end
371

372
alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), Int(size)) # size is always 64k from libuv
×
373

374
function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
×
375
    sock = @handle_as handle UDPSocket
×
376
    lock(sock.recvnotify)
×
377
    try
×
378
        buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
×
379
        if nread == 0 && addr == C_NULL
×
380
            Libc.free(buf_addr)
×
381
        elseif nread < 0
×
382
            Libc.free(buf_addr)
×
383
            notify_error(sock.recvnotify, _UVError("recv", nread))
×
384
        elseif flags & UV_UDP_PARTIAL > 0
×
385
            Libc.free(buf_addr)
×
386
            notify_error(sock.recvnotify, "Partial message received")
×
387
        else
388
            buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
×
389
            if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
×
390
                buf = unsafe_wrap(Array, buf_addr, nread, own=true)
×
391
            else
392
                buf = Vector{UInt8}(undef, nread)
×
393
                GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread)
×
394
                Libc.free(buf_addr)
×
395
            end
396
            # need to check the address type in order to convert to a Julia IPAddr
397
            host = IPv4(0)
×
398
            port = UInt16(0)
×
399
            if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
×
400
                host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
×
401
                port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
×
402
            elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
×
403
                tmp = Ref{UInt128}(0)
×
404
                scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
×
405
                host = IPv6(ntoh(tmp[]))
×
406
                port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
×
407
            end
408
            from = InetAddr(host, port)
×
409
            notify(sock.recvnotify, (from, buf), all=false)
×
410
        end
411
        if sock.status == StatusActive && isempty(sock.recvnotify)
×
412
            sock.status = StatusOpen
×
413
            ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
×
414
        end
415
    finally
416
        unlock(sock.recvnotify)
×
417
    end
418
    nothing
×
419
end
420

421
function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf)
79✔
422
    req = Libc.malloc(Base._sizeof_uv_udp_send)
79✔
423
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
79✔
424
    host_in = Ref(hton(ipaddr.host))
79✔
425
    err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint),
79✔
426
                req, sock, hton(port), host_in, buf, sizeof(buf),
427
                @cfunction(Base.uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)),
428
                ipaddr isa IPv6)
429
    if err < 0
79✔
430
        Libc.free(req)
×
431
        uv_error("send", err)
×
432
    end
433
    return req
79✔
434
end
435

436
"""
437
    send(socket::UDPSocket, host::IPAddr, port::Integer, msg)
438

439
Send `msg` over `socket` to `host:port`.
440
"""
441
function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
79✔
442
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
443
    iolock_begin()
79✔
444
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
79✔
445
        error("UDPSocket is not initialized and open")
×
446
    end
447
    uvw = _send_async(sock, ipaddr, UInt16(port), msg)
79✔
448
    ct = current_task()
79✔
449
    preserve_handle(ct)
79✔
450
    Base.sigatomic_begin()
79✔
451
    uv_req_set_data(uvw, ct)
79✔
452
    iolock_end()
79✔
453
    status = try
79✔
454
        Base.sigatomic_end()
79✔
455
        wait()::Cint
79✔
456
    finally
457
        Base.sigatomic_end()
79✔
458
        iolock_begin()
79✔
459
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::Base.IntrusiveLinkedList{Task}, ct)
79✔
460
        if uv_req_data(uvw) != C_NULL
79✔
461
            # uvw is still alive,
462
            # so make sure we won't get spurious notifications later
463
            uv_req_set_data(uvw, C_NULL)
×
464
        else
465
            # done with uvw
466
            Libc.free(uvw)
79✔
467
        end
468
        iolock_end()
79✔
469
        unpreserve_handle(ct)
79✔
470
    end
471
    uv_error("send", status)
79✔
472
    nothing
75✔
473
end
474

475

476
#from `connect`
477
function uv_connectcb_tcp(conn::Ptr{Cvoid}, status::Cint)
349✔
478
    hand = ccall(:jl_uv_connect_handle, Ptr{Cvoid}, (Ptr{Cvoid},), conn)
349✔
479
    sock = @handle_as hand TCPSocket
349✔
480
    connectcb(conn, status, hand, sock)
349✔
481
end
482

483
function uv_connectcb_pipe(conn::Ptr{Cvoid}, status::Cint)
×
484
    hand = ccall(:jl_uv_connect_handle, Ptr{Cvoid}, (Ptr{Cvoid},), conn)
×
485
    sock = @handle_as hand PipeEndpoint
×
486
    connectcb(conn, status, hand, sock)
×
487
end
488

489
function connectcb(conn::Ptr{Cvoid}, status::Cint, hand::Ptr{Cvoid}, sock::LibuvStream)
349✔
490
    lock(sock.cond)
349✔
491
    try
349✔
492
        if status >= 0 # success
349✔
493
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
698✔
494
                sock.status = StatusOpen
349✔
495
            end
496
        else
497
            sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this
×
498
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
×
499
                ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand)
×
500
                sock.status = StatusClosing
×
501
            end
502
        end
503
        notify(sock.cond)
349✔
504
    finally
505
        unlock(sock.cond)
349✔
506
    end
507
    Libc.free(conn)
349✔
508
    nothing
349✔
509
end
510

511
function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer)
2,490✔
512
    iolock_begin()
2,490✔
513
    if sock.status != StatusInit
2,490✔
514
        error("TCPSocket is not in initialization state")
×
515
    end
516
    if !(0 <= port <= typemax(UInt16))
2,490✔
517
        throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port"))
8✔
518
    end
519
    host_in = Ref(hton(host.host))
2,482✔
520
    uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint),
2,482✔
521
                              sock, host_in, hton(UInt16(port)), @cfunction(uv_connectcb_tcp, Cvoid, (Ptr{Cvoid}, Cint)),
522
                              host isa IPv6))
523
    sock.status = StatusConnecting
2,482✔
524
    iolock_end()
2,482✔
525
    nothing
2,482✔
526
end
527

528
connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port)
11✔
529

530
function wait_connected(x::LibuvStream)
5,588✔
531
    iolock_begin()
5,588✔
532
    check_open(x)
5,588✔
533
    isopen(x) || x.readerror === nothing || throw(x.readerror)
5,588✔
534
    preserve_handle(x)
5,588✔
535
    lock(x.cond)
5,588✔
536
    try
5,588✔
537
        while x.status == StatusConnecting
10,056✔
538
            iolock_end()
4,468✔
539
            wait(x.cond)
4,468✔
540
            unlock(x.cond)
4,468✔
541
            iolock_begin()
4,468✔
542
            lock(x.cond)
4,468✔
543
        end
4,468✔
544
        isopen(x) || x.readerror === nothing || throw(x.readerror)
5,693✔
545
    finally
546
        unlock(x.cond)
5,588✔
547
        unpreserve_handle(x)
5,588✔
548
    end
549
    iolock_end()
5,483✔
550
    nothing
5,483✔
551
end
552

553
# Default Host to localhost
554

555
"""
556
    connect([host], port::Integer) -> TCPSocket
557

558
Connect to the host `host` on port `port`.
559
"""
560
connect(sock::TCPSocket, port::Integer) = connect(sock, localhost, port)
×
561
connect(port::Integer) = connect(localhost, port)
1,895✔
562

563
# Valid connect signatures for TCP
564
connect(host::AbstractString, port::Integer) = connect(TCPSocket(), host, port)
2✔
565
connect(addr::IPAddr, port::Integer) = connect(TCPSocket(), addr, port)
1,917✔
566
connect(addr::InetAddr) = connect(TCPSocket(), addr)
7✔
567

568
function connect!(sock::TCPSocket, host::AbstractString, port::Integer)
569
    if sock.status != StatusInit
2✔
570
        error("TCPSocket is not in initialization state")
×
571
    end
572
    ipaddr = getaddrinfo(host)
2✔
573
    connect!(sock, ipaddr, port)
×
574
    return sock
×
575
end
576

577
function connect(sock::LibuvStream, args...)
578
    connect!(sock, args...)
4,476✔
579
    wait_connected(sock)
4,466✔
580
    return sock
4,363✔
581
end
582

583
"""
584
    nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool)
585

586
Nagle's algorithm batches multiple small TCP packets into larger
587
ones. This can improve throughput but worsen latency. Nagle's algorithm
588
is enabled by default. This function sets whether Nagle's algorithm is
589
active on a given TCP server or socket. The opposite option is called
590
`TCP_NODELAY` in other languages.
591

592
!!! compat "Julia 1.3"
593
    This function requires Julia 1.3 or later.
594
"""
595
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
596
    # disable or enable Nagle's algorithm on all OSes
597
    iolock_begin()
1,132✔
598
    check_open(sock)
1,132✔
599
    err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
1,132✔
600
    # TODO: check err
601
    iolock_end()
1,132✔
602
    return err
1,132✔
603
end
604

605
"""
606
    quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool)
607

608
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
609
"""
610
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
132✔
611
    iolock_begin()
1,089✔
612
    check_open(sock)
1,089✔
613
    @static if Sys.islinux()
614
        # tcp_quickack is a linux only option
615
        if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
132✔
616
            @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
617
        end
618
    end
619
    iolock_end()
1,089✔
620
    nothing
1,089✔
621
end
622

623

624
##
625

626
const BACKLOG_DEFAULT = 511
627

628
"""
629
    listen([addr, ]port::Integer; backlog::Integer=BACKLOG_DEFAULT) -> TCPServer
630

631
Listen on port on the address specified by `addr`.
632
By default this listens on `localhost` only.
633
To listen on all interfaces pass `IPv4(0)` or `IPv6(0)` as appropriate.
634
`backlog` determines how many connections can be pending (not having
635
called [`accept`](@ref)) before the server will begin to
636
reject them. The default value of `backlog` is 511.
637
"""
638
function listen(addr; backlog::Integer=BACKLOG_DEFAULT)
26✔
639
    sock = TCPServer()
13✔
640
    bind(sock, addr) || error("cannot bind to port; may already be in use or access denied")
13✔
641
    listen(sock; backlog=backlog)
13✔
642
    return sock
11✔
643
end
644
listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog)
4✔
645
listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog)
8✔
646

647
function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
1,898✔
648
    uv_error("listen", trylisten(sock; backlog=backlog))
1,898✔
649
    return sock
1,896✔
650
end
651

652
# from `listen`
653
function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint)
14✔
654
    sock = @handle_as stream LibuvServer
14✔
655
    lock(sock.cond)
28✔
656
    try
14✔
657
        if status >= 0
14✔
658
            notify(sock.cond)
14✔
659
        else
660
            notify_error(sock.cond, _UVError("connection", status))
14✔
661
        end
662
    finally
663
        unlock(sock.cond)
28✔
664
    end
665
    nothing
14✔
666
end
667

668
function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
4,166✔
669
    iolock_begin()
4,166✔
670
    check_open(sock)
4,166✔
671
    err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}),
4,166✔
672
                sock, backlog, @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)))
673
    sock.status = StatusActive
4,166✔
674
    iolock_end()
4,166✔
675
    return err
4,166✔
676
end
677

678
##
679

680
function accept_nonblock(server::TCPServer, client::TCPSocket)
681
    iolock_begin()
36✔
682
    if client.status != StatusInit
36✔
683
        error("client TCPSocket is not in initialization state")
×
684
    end
685
    err = ccall(:uv_accept, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), server.handle, client.handle)
36✔
686
    if err == 0
36✔
687
        client.status = StatusOpen
12✔
688
    end
689
    iolock_end()
36✔
690
    return err
36✔
691
end
692

693
function accept_nonblock(server::TCPServer)
×
694
    client = TCPSocket()
×
695
    uv_error("accept", accept_nonblock(server, client))
×
696
    return client
×
697
end
698

699
function accept(server::LibuvServer, client::LibuvStream)
26✔
700
    iolock_begin()
26✔
701
    if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
26✔
702
        throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
×
703
    end
704
    while isopen(server)
40✔
705
        err = accept_nonblock(server, client)
40✔
706
        if err == 0
40✔
707
            iolock_end()
14✔
708
            return client
14✔
709
        elseif err != UV_EAGAIN
26✔
710
            uv_error("accept", err)
×
711
        end
712
        preserve_handle(server)
26✔
713
        lock(server.cond)
26✔
714
        iolock_end()
26✔
715
        try
26✔
716
            wait(server.cond)
26✔
717
        finally
718
            unlock(server.cond)
14✔
719
            unpreserve_handle(server)
14✔
720
        end
721
        iolock_begin()
14✔
722
    end
14✔
723
    uv_error("accept", UV_ECONNABORTED)
×
724
    nothing
×
725
end
726

727
## Utility functions
728

729
const localhost = ip"127.0.0.1"
730

731
"""
732
    listenany([host::IPAddr,] port_hint; backlog::Integer=BACKLOG_DEFAULT) -> (UInt16, TCPServer)
733

734
Create a `TCPServer` on any port, using hint as a starting point. Returns a tuple of the
735
actual port that the server was created on and the server itself.
736
The backlog argument defines the maximum length to which the queue of pending connections for sockfd may grow.
737
"""
738
function listenany(host::IPAddr, default_port; backlog::Integer=BACKLOG_DEFAULT)
4,524✔
739
    addr = InetAddr(host, default_port)
2,262✔
740
    while true
2,268✔
741
        sock = TCPServer()
2,268✔
742
        if bind(sock, addr) && trylisten(sock; backlog) == 0
2,268✔
743
            if default_port == 0
2,262✔
744
                _addr, port = getsockname(sock)
4✔
745
                return (port, sock)
2✔
746
            end
747
            return (addr.port, sock)
2,260✔
748
        end
749
        close(sock)
6✔
750
        addr = InetAddr(addr.host, addr.port + UInt16(1))
6✔
751
        if addr.port == default_port
6✔
752
            error("no ports available")
×
753
        end
754
    end
6✔
755
end
756

757
listenany(default_port; backlog::Integer=BACKLOG_DEFAULT) = listenany(localhost, default_port; backlog)
3,780✔
758

759
function udp_set_membership(sock::UDPSocket, group_addr::String,
760
                            interface_addr::Union{Nothing, String}, operation)
761
    if interface_addr === nothing
×
762
        interface_addr = C_NULL
×
763
    end
764
    r = ccall(:uv_udp_set_membership, Cint,
4✔
765
              (Ptr{Cvoid}, Cstring, Cstring, Cint),
766
              sock.handle, group_addr, interface_addr, operation)
767
    uv_error("uv_udp_set_membership", r)
4✔
768
    return
4✔
769
end
770

771
"""
772
    join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
773

774
Join a socket to a particular multicast group defined by `group_addr`.
775
If `interface_addr` is given, specifies a particular interface for multi-homed
776
systems.  Use `leave_multicast_group()` to disable reception of a group.
777
"""
778
function join_multicast_group(sock::UDPSocket, group_addr::String,
779
                              interface_addr::Union{Nothing, String} = nothing)
780
    return udp_set_membership(sock, group_addr, interface_addr, 1)
2✔
781
end
782
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
4✔
783
                              interface_addr::Union{Nothing, IPAddr} = nothing)
784
    if interface_addr !== nothing
4✔
785
        interface_addr = string(interface_addr)
×
786
    end
787
    return join_multicast_group(sock, string(group_addr), interface_addr)
2✔
788
end
789

790
"""
791
    leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
792

793
Remove a socket from a particular multicast group defined by `group_addr`.
794
If `interface_addr` is given, specifies a particular interface for multi-homed
795
systems.  Use `join_multicast_group()` to enable reception of a group.
796
"""
797
function leave_multicast_group(sock::UDPSocket, group_addr::String,
798
                               interface_addr::Union{Nothing, String} = nothing)
799
    return udp_set_membership(sock, group_addr, interface_addr, 0)
2✔
800
end
801
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
4✔
802
                               interface_addr::Union{Nothing, IPAddr} = nothing)
803
    if interface_addr !== nothing
4✔
804
        interface_addr = string(interface_addr)
×
805
    end
806
    return leave_multicast_group(sock, string(group_addr), interface_addr)
2✔
807
end
808

809
"""
810
    getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
811

812
Get the IP address and port that the given socket is bound to.
813
"""
814
getsockname(sock::Union{TCPSocket, TCPServer}) = _sockname(sock, true)
452✔
815

816

817
"""
818
    getpeername(sock::TCPSocket) -> (IPAddr, UInt16)
819

820
Get the IP address and port of the remote endpoint that the given
821
socket is connected to. Valid only for connected TCP sockets.
822
"""
823
getpeername(sock::TCPSocket) = _sockname(sock, false)
44✔
824

825
function _sockname(sock, self=true)
20✔
826
    sock.status == StatusInit || check_open(sock)
346✔
827
    rport = Ref{Cushort}(0)
272✔
828
    raddress = zeros(UInt8, 16)
272✔
829
    rfamily = Ref{Cuint}(0)
272✔
830

831
    iolock_begin()
272✔
832
    if self
144✔
833
        r = ccall(:jl_tcp_getsockname, Int32,
248✔
834
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
835
                sock.handle, rport, raddress, rfamily)
836
    else
837
        r = ccall(:jl_tcp_getpeername, Int32,
24✔
838
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
839
                sock.handle, rport, raddress, rfamily)
840
    end
841
    iolock_end()
272✔
842
    uv_error("cannot obtain socket name", r)
272✔
843
    port = ntoh(rport[])
272✔
844
    af_inet6 = @static if Sys.iswindows() # AF_INET6 in <sys/socket.h>
845
        23
846
    elseif Sys.isapple()
847
        30
848
    elseif Sys.KERNEL ∈ (:FreeBSD, :DragonFly)
849
        28
850
    elseif Sys.KERNEL ∈ (:NetBSD, :OpenBSD)
851
        24
852
    else
853
        10
128✔
854
    end
855

856
    if rfamily[] == 2 # AF_INET
272✔
857
        addrv4 = raddress[1:4]
528✔
858
        naddr = ntoh(unsafe_load(Ptr{Cuint}(pointer(addrv4)), 1))
188✔
859
        addr = IPv4(naddr)
188✔
860
    elseif rfamily[] == af_inet6
8✔
861
        naddr = ntoh(unsafe_load(Ptr{UInt128}(pointer(raddress)), 1))
8✔
862
        addr = IPv6(naddr)
8✔
863
    else
864
        error(string("unsupported address family: ", rfamily[]))
×
865
    end
866
    return addr, port
272✔
867
end
868

869
# domain sockets
870

871
include("PipeServer.jl")
872

873
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc