• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37798

05 Jun 2024 07:57AM UTC coverage: 83.152% (-3.8%) from 86.908%
#37798

push

local

web-flow
Use the public `wait()` in the `errormonitor()` docstring (#54650)

72847 of 87607 relevant lines covered (83.15%)

14515294.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.62
/stdlib/Sockets/src/Sockets.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Support for sockets. Provides [`IPAddr`](@ref) and subtypes, [`TCPSocket`](@ref), and [`UDPSocket`](@ref).
5
"""
6
module Sockets
7

8
export
9
    accept,
10
    bind,
11
    connect,
12
    getaddrinfo,
13
    getalladdrinfo,
14
    getnameinfo,
15
    getipaddr,
16
    getipaddrs,
17
    islinklocaladdr,
18
    getpeername,
19
    getsockname,
20
    listen,
21
    listenany,
22
    recv,
23
    recvfrom,
24
    send,
25
    join_multicast_group,
26
    leave_multicast_group,
27
    TCPSocket,
28
    UDPSocket,
29
    @ip_str,
30
    IPAddr,
31
    IPv4,
32
    IPv6
33

34
import Base: isless, show, print, parse, bind, alloc_buf_hook, _uv_hook_close
35

36
using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize,
37
    notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError,
38
    eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive,
39
    preserve_handle, unpreserve_handle, iolock_begin, iolock_end,
40
    uv_status_string, check_open, OS_HANDLE, RawFD,
41
    UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL,
42
    UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS,
43
    UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL,
44
    UV_EAI_FAMILY, UV_EAI_NODATA, UV_EAI_NONAME,
45
    UV_EAI_OVERFLOW, UV_EAI_PROTOCOL, UV_EAI_SERVICE,
46
    UV_EAI_SOCKTYPE, UV_EAI_MEMORY
47

48
include("IPAddr.jl")
49
include("addrinfo.jl")
50

51
"""
52
    TCPSocket(; delay=true)
53

54
Open a TCP socket using libuv. If `delay` is true, libuv delays creation of the
55
socket's file descriptor till the first [`bind`](@ref) call. `TCPSocket` has various
56
fields to denote the state of the socket as well as its send/receive buffers.
57
"""
58
mutable struct TCPSocket <: LibuvStream
59
    handle::Ptr{Cvoid}
60
    status::Int
61
    buffer::IOBuffer
62
    cond::Base.ThreadSynchronizer
63
    readerror::Any
64
    sendbuf::Union{IOBuffer, Nothing}
65
    lock::ReentrantLock # advisory lock
66
    throttle::Int
67

68
    function TCPSocket(handle::Ptr{Cvoid}, status)
69
        tcp = new(
1,358✔
70
                handle,
71
                status,
72
                PipeBuffer(),
73
                Base.ThreadSynchronizer(),
74
                nothing,
75
                nothing,
76
                ReentrantLock(),
77
                Base.DEFAULT_READ_BUFFER_SZ)
78
        associate_julia_struct(tcp.handle, tcp)
1,358✔
79
        finalizer(uvfinalize, tcp)
1,358✔
80
        return tcp
1,358✔
81
    end
82
end
83

84
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
85
function TCPSocket(; delay=true)
2,716✔
86
    tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
1,358✔
87
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
1,358✔
88
    iolock_begin()
1,358✔
89
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
1,358✔
90
                eventloop(), tcp.handle, af_spec)
91
    uv_error("failed to create tcp socket", err)
1,358✔
92
    tcp.status = StatusInit
1,358✔
93
    iolock_end()
1,358✔
94
    return tcp
1,358✔
95
end
96

97
function TCPSocket(fd::OS_HANDLE)
×
98
    tcp = TCPSocket()
×
99
    iolock_begin()
×
100
    err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), tcp.handle, fd)
×
101
    uv_error("tcp_open", err)
×
102
    tcp.status = StatusOpen
×
103
    iolock_end()
×
104
    return tcp
×
105
end
106
if OS_HANDLE != RawFD
107
    TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd))
×
108
end
109

110

111
mutable struct TCPServer <: LibuvServer
112
    handle::Ptr{Cvoid}
113
    status::Int
114
    cond::Base.ThreadSynchronizer
115

116
    function TCPServer(handle::Ptr{Cvoid}, status)
117
        tcp = new(
680✔
118
            handle,
119
            status,
120
            Base.ThreadSynchronizer())
121
        associate_julia_struct(tcp.handle, tcp)
680✔
122
        finalizer(uvfinalize, tcp)
680✔
123
        return tcp
680✔
124
    end
125
end
126

127
# Keyword arg "delay": if true, libuv delays creation of socket fd till bind.
128
# It can be set to false if there is a need to set socket options before
129
# further calls to `bind` and `listen`, e.g. `SO_REUSEPORT`.
130
function TCPServer(; delay=true)
1,360✔
131
    tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
680✔
132
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
680✔
133
    iolock_begin()
680✔
134
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
680✔
135
                eventloop(), tcp.handle, af_spec)
136
    uv_error("failed to create tcp server", err)
680✔
137
    tcp.status = StatusInit
680✔
138
    iolock_end()
680✔
139
    return tcp
680✔
140
end
141

142
"""
143
    accept(server[, client])
144

145
Accepts a connection on the given server and returns a connection to the client. An
146
uninitialized client stream may be provided, in which case it will be used instead of
147
creating a new stream.
148
"""
149
accept(server::TCPServer) = accept(server, TCPSocket())
732✔
150

151
function accept(callback, server::LibuvServer)
×
152
    task = @async try
×
153
            while true
×
154
                client = accept(server)
×
155
                callback(client)
×
156
            end
×
157
        catch ex
158
            # accept below may explicitly throw UV_ECONNABORTED:
159
            # filter that out since we expect that error
160
            if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server)
×
161
                rethrow()
×
162
            end
163
        end
164
    return task # caller is responsible for checking for errors
×
165
end
166

167

168
# UDP
169
"""
170
    UDPSocket()
171

172
Open a UDP socket using libuv. `UDPSocket` has various
173
fields to denote the state of the socket.
174
"""
175
mutable struct UDPSocket <: LibuvStream
176
    handle::Ptr{Cvoid}
177
    status::Int
178
    recvnotify::Base.ThreadSynchronizer
179
    cond::Base.ThreadSynchronizer
180

181
    function UDPSocket(handle::Ptr{Cvoid}, status)
×
182
        cond = Base.ThreadSynchronizer()
×
183
        udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond)
×
184
        associate_julia_struct(udp.handle, udp)
×
185
        finalizer(uvfinalize, udp)
×
186
        return udp
×
187
    end
188
end
189
function UDPSocket()
×
190
    this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit)
×
191
    iolock_begin()
×
192
    err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
×
193
                eventloop(), this.handle)
194
    uv_error("failed to create udp socket", err)
×
195
    this.status = StatusInit
×
196
    iolock_end()
×
197
    return this
×
198
end
199

200
show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")
×
201

202
function _uv_hook_close(sock::UDPSocket)
×
203
    lock(sock.cond)
×
204
    try
×
205
        sock.status = StatusClosed
×
206
        notify(sock.cond)
×
207
        notify_error(sock.recvnotify, EOFError())
×
208
    finally
209
        unlock(sock.cond)
×
210
    end
211
    nothing
×
212
end
213

214
# Disables dual stack mode.
215
const UV_TCP_IPV6ONLY = 1
216

217
# Disables dual stack mode. Only available when using ipv6 binf
218
const UV_UDP_IPV6ONLY = 1
219

220
# Indicates message was truncated because read buffer was too small. The
221
# remainder was discarded by the OS.
222
const UV_UDP_PARTIAL = 2
223

224
# Indicates if SO_REUSEADDR will be set when binding the handle in uv_udp_bind. This sets
225
# the SO_REUSEPORT socket flag on the BSDs and OS X. On other Unix platforms, it sets the
226
# SO_REUSEADDR flag. What that means is that multiple threads or processes can bind to the
227
# same address without error (provided they all set the flag) but only the last one to bind
228
# will receive any traffic, in effect "stealing" the port from the previous listener.
229
const UV_UDP_REUSEADDR = 4
230

231
##
232

233
function _bind(sock::Union{TCPServer, TCPSocket}, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
234
    host_in = Ref(hton(host.host))
683✔
235
    return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
683✔
236
            sock, hton(port), host_in, flags, host isa IPv6)
237
end
238

239
function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
×
240
    host_in = Ref(hton(host.host))
×
241
    return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
×
242
            sock, hton(port), host_in, flags, host isa IPv6)
243
end
244

245
"""
246
    bind(socket::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...)
247

248
Bind `socket` to the given `host:port`. Note that `0.0.0.0` will listen on all devices.
249

250
* The `ipv6only` parameter disables dual stack mode. If `ipv6only=true`, only an IPv6 stack is created.
251
* If `reuseaddr=true`, multiple threads or processes can bind to the same address without error
252
  if they all set `reuseaddr=true`, but only the last to bind will receive any traffic.
253
"""
254
function bind(sock::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only = false, reuseaddr = false, kws...)
1,366✔
255
    if sock.status != StatusInit
683✔
256
        error("$(typeof(sock)) is not in initialization state")
×
257
    end
258
    flags = 0
683✔
259
    if isa(host, IPv6) && ipv6only
683✔
260
        flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY
×
261
    end
262
    if isa(sock, UDPSocket) && reuseaddr
683✔
263
        flags |= UV_UDP_REUSEADDR
×
264
    end
265
    iolock_begin()
683✔
266
    err = _bind(sock, host, UInt16(port), UInt32(flags))
683✔
267
    if err < 0
683✔
268
        iolock_end()
×
269
        if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL
×
270
            #TODO: this codepath is not currently tested
271
            throw(_UVError("bind", err))
×
272
        else
273
            return false
×
274
        end
275
    end
276
    if isa(sock, TCPServer) || isa(sock, UDPSocket)
683✔
277
        sock.status = StatusOpen
680✔
278
    end
279
    isa(sock, UDPSocket) && setopt(sock; kws...)
683✔
280
    iolock_end()
683✔
281
    return true
683✔
282
end
283

284
bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port)
680✔
285

286
"""
287
    setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
288

289
Set UDP socket options.
290

291
* `multicast_loop`: loopback for multicast packets (default: `true`).
292
* `multicast_ttl`: TTL for multicast packets (default: `nothing`).
293
* `enable_broadcast`: flag must be set to `true` if socket will be used for broadcast
294
  messages, or else the UDP system will return an access error (default: `false`).
295
* `ttl`: Time-to-live of packets sent on the socket (default: `nothing`).
296
"""
297
function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
×
298
    iolock_begin()
×
299
    if sock.status == StatusUninit
×
300
        error("Cannot set options on uninitialized socket")
×
301
    end
302
    if multicast_loop !== nothing
×
303
        uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0)
×
304
    end
305
    if multicast_ttl !== nothing
×
306
        uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl))
×
307
    end
308
    if enable_broadcast !== nothing
×
309
        uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast))
×
310
    end
311
    if ttl !== nothing
×
312
        uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl))
×
313
    end
314
    iolock_end()
×
315
    nothing
×
316
end
317

318
"""
319
    recv(socket::UDPSocket)
320

321
Read a UDP packet from the specified socket, and return the bytes received. This call blocks.
322
"""
323
function recv(sock::UDPSocket)
×
324
    addr, data = recvfrom(sock)
×
325
    return data
×
326
end
327

328
function uv_recvcb end
329

330
"""
331
    recvfrom(socket::UDPSocket) -> (host_port, data)
332

333
Read a UDP packet from the specified socket, returning a tuple of `(host_port, data)`, where
334
`host_port` will be an InetAddr{IPv4} or InetAddr{IPv6}, as appropriate.
335

336
!!! compat "Julia 1.3"
337
    Prior to Julia version 1.3, the first returned value was an address (`IPAddr`).
338
    In version 1.3 it was changed to an `InetAddr`.
339
"""
340
function recvfrom(sock::UDPSocket)
×
341
    iolock_begin()
×
342
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
343
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
344
        error("UDPSocket is not initialized and open")
×
345
    end
346
    if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
×
347
        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
×
348
                    sock,
349
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
×
350
                    @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
×
351
        uv_error("recv_start", err)
×
352
    end
353
    sock.status = StatusActive
×
354
    lock(sock.recvnotify)
×
355
    iolock_end()
×
356
    try
×
357
        From = Union{InetAddr{IPv4}, InetAddr{IPv6}}
×
358
        Data = Vector{UInt8}
×
359
        from, data = wait(sock.recvnotify)::Tuple{From, Data}
×
360
        return (from, data)
×
361
    finally
362
        unlock(sock.recvnotify)
×
363
    end
364
end
365

366
alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), Int(size)) # size is always 64k from libuv
×
367

368
function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
×
369
    sock = @handle_as handle UDPSocket
×
370
    lock(sock.recvnotify)
×
371
    try
×
372
        buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
×
373
        if nread == 0 && addr == C_NULL
×
374
            Libc.free(buf_addr)
×
375
        elseif nread < 0
×
376
            Libc.free(buf_addr)
×
377
            notify_error(sock.recvnotify, _UVError("recv", nread))
×
378
        elseif flags & UV_UDP_PARTIAL > 0
×
379
            Libc.free(buf_addr)
×
380
            notify_error(sock.recvnotify, "Partial message received")
×
381
        else
382
            buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
×
383
            if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
×
384
                buf = unsafe_wrap(Array, buf_addr, nread, own=true)
×
385
            else
386
                buf = Vector{UInt8}(undef, nread)
×
387
                GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread)
×
388
                Libc.free(buf_addr)
×
389
            end
390
            # need to check the address type in order to convert to a Julia IPAddr
391
            host = IPv4(0)
×
392
            port = UInt16(0)
×
393
            if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
×
394
                host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
×
395
                port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
×
396
            elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
×
397
                tmp = Ref{UInt128}(0)
×
398
                scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
×
399
                host = IPv6(ntoh(tmp[]))
×
400
                port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
×
401
            end
402
            from = InetAddr(host, port)
×
403
            notify(sock.recvnotify, (from, buf), all=false)
×
404
        end
405
        if sock.status == StatusActive && isempty(sock.recvnotify)
×
406
            sock.status = StatusOpen
×
407
            ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
×
408
        end
409
    finally
410
        unlock(sock.recvnotify)
×
411
    end
412
    nothing
×
413
end
414

415
function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf)
×
416
    req = Libc.malloc(Base._sizeof_uv_udp_send)
×
417
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
×
418
    host_in = Ref(hton(ipaddr.host))
×
419
    err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint),
×
420
                req, sock, hton(port), host_in, buf, sizeof(buf),
421
                @cfunction(Base.uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)),
×
422
                ipaddr isa IPv6)
423
    if err < 0
×
424
        Libc.free(req)
×
425
        uv_error("send", err)
×
426
    end
427
    return req
×
428
end
429

430
"""
431
    send(socket::UDPSocket, host::IPAddr, port::Integer, msg)
432

433
Send `msg` over `socket` to `host:port`.
434
"""
435
function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
×
436
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
437
    iolock_begin()
×
438
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
439
        error("UDPSocket is not initialized and open")
×
440
    end
441
    uvw = _send_async(sock, ipaddr, UInt16(port), msg)
×
442
    ct = current_task()
×
443
    preserve_handle(ct)
×
444
    Base.sigatomic_begin()
×
445
    uv_req_set_data(uvw, ct)
×
446
    iolock_end()
×
447
    status = try
×
448
        Base.sigatomic_end()
×
449
        wait()::Cint
×
450
    finally
451
        Base.sigatomic_end()
×
452
        iolock_begin()
×
453
        ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
×
454
        if uv_req_data(uvw) != C_NULL
×
455
            # uvw is still alive,
456
            # so make sure we won't get spurious notifications later
457
            uv_req_set_data(uvw, C_NULL)
×
458
        else
459
            # done with uvw
460
            Libc.free(uvw)
×
461
        end
462
        iolock_end()
×
463
        unpreserve_handle(ct)
×
464
    end
465
    uv_error("send", status)
×
466
    nothing
×
467
end
468

469

470
#from `connect`
471
function uv_connectcb(conn::Ptr{Cvoid}, status::Cint)
1,247✔
472
    hand = ccall(:jl_uv_connect_handle, Ptr{Cvoid}, (Ptr{Cvoid},), conn)
1,247✔
473
    sock = @handle_as hand LibuvStream
1,247✔
474
    lock(sock.cond)
2,494✔
475
    try
1,247✔
476
        if status >= 0 # success
1,247✔
477
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
2,494✔
478
                sock.status = StatusOpen
1,247✔
479
            end
480
        else
481
            sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this
×
482
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
×
483
                ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand)
×
484
                sock.status = StatusClosing
×
485
            end
486
        end
487
        notify(sock.cond)
1,247✔
488
    finally
489
        unlock(sock.cond)
2,494✔
490
    end
491
    Libc.free(conn)
1,247✔
492
    nothing
1,247✔
493
end
494

495
function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer)
626✔
496
    iolock_begin()
626✔
497
    if sock.status != StatusInit
626✔
498
        error("TCPSocket is not in initialization state")
×
499
    end
500
    if !(0 <= port <= typemax(UInt16))
626✔
501
        throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port"))
×
502
    end
503
    host_in = Ref(hton(host.host))
626✔
504
    uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint),
626✔
505
                              sock, host_in, hton(UInt16(port)), @cfunction(uv_connectcb, Cvoid, (Ptr{Cvoid}, Cint)),
506
                              host isa IPv6))
507
    sock.status = StatusConnecting
626✔
508
    iolock_end()
626✔
509
    nothing
626✔
510
end
511

512
connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port)
×
513

514
function wait_connected(x::LibuvStream)
1,304✔
515
    iolock_begin()
1,304✔
516
    check_open(x)
1,304✔
517
    isopen(x) || x.readerror === nothing || throw(x.readerror)
1,304✔
518
    preserve_handle(x)
1,304✔
519
    lock(x.cond)
1,304✔
520
    try
1,304✔
521
        while x.status == StatusConnecting
2,551✔
522
            iolock_end()
1,247✔
523
            wait(x.cond)
1,247✔
524
            unlock(x.cond)
1,247✔
525
            iolock_begin()
1,247✔
526
            lock(x.cond)
1,247✔
527
        end
1,247✔
528
        isopen(x) || x.readerror === nothing || throw(x.readerror)
1,304✔
529
    finally
530
        unlock(x.cond)
1,304✔
531
        unpreserve_handle(x)
1,304✔
532
    end
533
    iolock_end()
1,304✔
534
    nothing
1,304✔
535
end
536

537
# Default Host to localhost
538

539
"""
540
    connect([host], port::Integer) -> TCPSocket
541

542
Connect to the host `host` on port `port`.
543
"""
544
connect(sock::TCPSocket, port::Integer) = connect(sock, localhost, port)
×
545
connect(port::Integer) = connect(localhost, port)
623✔
546

547
# Valid connect signatures for TCP
548
connect(host::AbstractString, port::Integer) = connect(TCPSocket(), host, port)
×
549
connect(addr::IPAddr, port::Integer) = connect(TCPSocket(), addr, port)
623✔
550
connect(addr::InetAddr) = connect(TCPSocket(), addr)
×
551

552
function connect!(sock::TCPSocket, host::AbstractString, port::Integer)
×
553
    if sock.status != StatusInit
×
554
        error("TCPSocket is not in initialization state")
×
555
    end
556
    ipaddr = getaddrinfo(host)
×
557
    connect!(sock, ipaddr, port)
×
558
    return sock
×
559
end
560

561
function connect(sock::LibuvStream, args...)
562
    connect!(sock, args...)
1,247✔
563
    wait_connected(sock)
1,247✔
564
    return sock
1,247✔
565
end
566

567
"""
568
    nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool)
569

570
Nagle's algorithm batches multiple small TCP packets into larger
571
ones. This can improve throughput but worsen latency. Nagle's algorithm
572
is enabled by default. This function sets whether Nagle's algorithm is
573
active on a given TCP server or socket. The opposite option is called
574
`TCP_NODELAY` in other languages.
575

576
!!! compat "Julia 1.3"
577
    This function requires Julia 1.3 or later.
578
"""
579
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
112✔
580
    # disable or enable Nagle's algorithm on all OSes
581
    iolock_begin()
112✔
582
    check_open(sock)
112✔
583
    err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
112✔
584
    # TODO: check err
585
    iolock_end()
112✔
586
    return err
112✔
587
end
588

589
"""
590
    quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool)
591

592
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
593
"""
594
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
112✔
595
    iolock_begin()
112✔
596
    check_open(sock)
112✔
597
    @static if Sys.islinux()
598
        # tcp_quickack is a linux only option
599
        if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
112✔
600
            @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
×
601
        end
602
    end
603
    iolock_end()
112✔
604
    nothing
112✔
605
end
606

607

608
##
609

610
const BACKLOG_DEFAULT = 511
611

612
"""
613
    listen([addr, ]port::Integer; backlog::Integer=BACKLOG_DEFAULT) -> TCPServer
614

615
Listen on port on the address specified by `addr`.
616
By default this listens on `localhost` only.
617
To listen on all interfaces pass `IPv4(0)` or `IPv6(0)` as appropriate.
618
`backlog` determines how many connections can be pending (not having
619
called [`accept`](@ref)) before the server will begin to
620
reject them. The default value of `backlog` is 511.
621
"""
622
function listen(addr; backlog::Integer=BACKLOG_DEFAULT)
×
623
    sock = TCPServer()
×
624
    bind(sock, addr) || error("cannot bind to port; may already be in use or access denied")
×
625
    listen(sock; backlog=backlog)
×
626
    return sock
×
627
end
628
listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog)
×
629
listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog)
×
630

631
function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
621✔
632
    uv_error("listen", trylisten(sock; backlog=backlog))
621✔
633
    return sock
621✔
634
end
635

636
# from `listen`
637
function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint)
1,298✔
638
    sock = @handle_as stream LibuvServer
1,298✔
639
    lock(sock.cond)
2,596✔
640
    try
1,298✔
641
        if status >= 0
1,298✔
642
            notify(sock.cond)
1,298✔
643
        else
644
            notify_error(sock.cond, _UVError("connection", status))
1,298✔
645
        end
646
    finally
647
        unlock(sock.cond)
2,596✔
648
    end
649
    nothing
1,298✔
650
end
651

652
function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
2,602✔
653
    iolock_begin()
1,301✔
654
    check_open(sock)
1,301✔
655
    err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}),
1,301✔
656
                sock, backlog, @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)))
657
    sock.status = StatusActive
1,301✔
658
    iolock_end()
1,301✔
659
    return err
1,301✔
660
end
661

662
##
663

664
function accept_nonblock(server::TCPServer, client::TCPSocket)
665
    iolock_begin()
1,409✔
666
    if client.status != StatusInit
1,409✔
667
        error("client TCPSocket is not in initialization state")
×
668
    end
669
    err = ccall(:uv_accept, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), server.handle, client.handle)
1,409✔
670
    if err == 0
1,409✔
671
        client.status = StatusOpen
677✔
672
    end
673
    iolock_end()
1,409✔
674
    return err
1,409✔
675
end
676

677
function accept_nonblock(server::TCPServer)
×
678
    client = TCPSocket()
×
679
    uv_error("accept", accept_nonblock(server, client))
×
680
    return client
×
681
end
682

683
function accept(server::LibuvServer, client::LibuvStream)
1,353✔
684
    iolock_begin()
1,353✔
685
    if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
1,353✔
686
        throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
×
687
    end
688
    while isopen(server)
2,651✔
689
        err = accept_nonblock(server, client)
2,651✔
690
        if err == 0
2,651✔
691
            iolock_end()
1,298✔
692
            return client
1,298✔
693
        elseif err != UV_EAGAIN
1,353✔
694
            uv_error("accept", err)
×
695
        end
696
        preserve_handle(server)
1,353✔
697
        lock(server.cond)
1,353✔
698
        iolock_end()
1,353✔
699
        try
1,353✔
700
            wait(server.cond)
1,353✔
701
        finally
702
            unlock(server.cond)
1,298✔
703
            unpreserve_handle(server)
1,298✔
704
        end
705
        iolock_begin()
1,298✔
706
    end
1,298✔
707
    uv_error("accept", UV_ECONNABORTED)
×
708
    nothing
×
709
end
710

711
## Utility functions
712

713
const localhost = ip"127.0.0.1"
714

715
"""
716
    listenany([host::IPAddr,] port_hint; backlog::Integer=BACKLOG_DEFAULT) -> (UInt16, TCPServer)
717

718
Create a `TCPServer` on any port, using hint as a starting point. Returns a tuple of the
719
actual port that the server was created on and the server itself.
720
The backlog argument defines the maximum length to which the queue of pending connections for sockfd may grow.
721
"""
722
function listenany(host::IPAddr, default_port; backlog::Integer=BACKLOG_DEFAULT)
1,358✔
723
    addr = InetAddr(host, default_port)
679✔
724
    while true
680✔
725
        sock = TCPServer()
680✔
726
        if bind(sock, addr) && trylisten(sock; backlog) == 0
680✔
727
            if default_port == 0
679✔
728
                _addr, port = getsockname(sock)
×
729
                return (port, sock)
×
730
            end
731
            return (addr.port, sock)
679✔
732
        end
733
        close(sock)
1✔
734
        addr = InetAddr(addr.host, addr.port + UInt16(1))
1✔
735
        if addr.port == default_port
1✔
736
            error("no ports available")
×
737
        end
738
    end
1✔
739
end
740

741
listenany(default_port; backlog::Integer=BACKLOG_DEFAULT) = listenany(localhost, default_port; backlog)
1,248✔
742

743
function udp_set_membership(sock::UDPSocket, group_addr::String,
×
744
                            interface_addr::Union{Nothing, String}, operation)
745
    if interface_addr === nothing
×
746
        interface_addr = C_NULL
×
747
    end
748
    r = ccall(:uv_udp_set_membership, Cint,
×
749
              (Ptr{Cvoid}, Cstring, Cstring, Cint),
750
              sock.handle, group_addr, interface_addr, operation)
751
    uv_error("uv_udp_set_membership", r)
×
752
    return
×
753
end
754

755
"""
756
    join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
757

758
Join a socket to a particular multicast group defined by `group_addr`.
759
If `interface_addr` is given, specifies a particular interface for multi-homed
760
systems.  Use `leave_multicast_group()` to disable reception of a group.
761
"""
762
function join_multicast_group(sock::UDPSocket, group_addr::String,
×
763
                              interface_addr::Union{Nothing, String} = nothing)
764
    return udp_set_membership(sock, group_addr, interface_addr, 1)
×
765
end
766
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
×
767
                              interface_addr::Union{Nothing, IPAddr} = nothing)
768
    if interface_addr !== nothing
×
769
        interface_addr = string(interface_addr)
×
770
    end
771
    return join_multicast_group(sock, string(group_addr), interface_addr)
×
772
end
773

774
"""
775
    leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
776

777
Remove a socket from  a particular multicast group defined by `group_addr`.
778
If `interface_addr` is given, specifies a particular interface for multi-homed
779
systems.  Use `join_multicast_group()` to enable reception of a group.
780
"""
781
function leave_multicast_group(sock::UDPSocket, group_addr::String,
×
782
                               interface_addr::Union{Nothing, String} = nothing)
783
    return udp_set_membership(sock, group_addr, interface_addr, 0)
×
784
end
785
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
×
786
                               interface_addr::Union{Nothing, IPAddr} = nothing)
787
    if interface_addr !== nothing
×
788
        interface_addr = string(interface_addr)
×
789
    end
790
    return leave_multicast_group(sock, string(group_addr), interface_addr)
×
791
end
792

793
"""
794
    getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
795

796
Get the IP address and port that the given socket is bound to.
797
"""
798
getsockname(sock::Union{TCPSocket, TCPServer}) = _sockname(sock, true)
3✔
799

800

801
"""
802
    getpeername(sock::TCPSocket) -> (IPAddr, UInt16)
803

804
Get the IP address and port of the remote endpoint that the given
805
socket is connected to. Valid only for connected TCP sockets.
806
"""
807
getpeername(sock::TCPSocket) = _sockname(sock, false)
×
808

809
function _sockname(sock, self=true)
3✔
810
    sock.status == StatusInit || check_open(sock)
3✔
811
    rport = Ref{Cushort}(0)
3✔
812
    raddress = zeros(UInt8, 16)
3✔
813
    rfamily = Ref{Cuint}(0)
3✔
814

815
    iolock_begin()
3✔
816
    if self
3✔
817
        r = ccall(:jl_tcp_getsockname, Int32,
3✔
818
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
819
                sock.handle, rport, raddress, rfamily)
820
    else
821
        r = ccall(:jl_tcp_getpeername, Int32,
×
822
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
823
                sock.handle, rport, raddress, rfamily)
824
    end
825
    iolock_end()
3✔
826
    uv_error("cannot obtain socket name", r)
3✔
827
    port = ntoh(rport[])
3✔
828
    af_inet6 = @static if Sys.iswindows() # AF_INET6 in <sys/socket.h>
829
        23
830
    elseif Sys.isapple()
831
        30
832
    elseif Sys.KERNEL ∈ (:FreeBSD, :DragonFly)
833
        28
834
    elseif Sys.KERNEL ∈ (:NetBSD, :OpenBSD)
835
        24
836
    else
837
        10
3✔
838
    end
839

840
    if rfamily[] == 2 # AF_INET
3✔
841
        addrv4 = raddress[1:4]
6✔
842
        naddr = ntoh(unsafe_load(Ptr{Cuint}(pointer(addrv4)), 1))
3✔
843
        addr = IPv4(naddr)
3✔
844
    elseif rfamily[] == af_inet6
×
845
        naddr = ntoh(unsafe_load(Ptr{UInt128}(pointer(raddress)), 1))
×
846
        addr = IPv6(naddr)
×
847
    else
848
        error(string("unsupported address family: ", rfamily[]))
×
849
    end
850
    return addr, port
3✔
851
end
852

853
# domain sockets
854

855
include("PipeServer.jl")
856

857
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc