• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37525

pending completion
#37525

push

local

web-flow
NFC: some cleanup in gc.c (#49577)

71766 of 83435 relevant lines covered (86.01%)

34298284.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.08
/stdlib/Sockets/src/Sockets.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Support for sockets. Provides [`IPAddr`](@ref) and subtypes, [`TCPSocket`](@ref), and [`UDPSocket`](@ref).
5
"""
6
module Sockets
7

8
export
9
    accept,
10
    bind,
11
    connect,
12
    getaddrinfo,
13
    getalladdrinfo,
14
    getnameinfo,
15
    getipaddr,
16
    getipaddrs,
17
    islinklocaladdr,
18
    getpeername,
19
    getsockname,
20
    listen,
21
    listenany,
22
    recv,
23
    recvfrom,
24
    send,
25
    join_multicast_group,
26
    leave_multicast_group,
27
    TCPSocket,
28
    UDPSocket,
29
    @ip_str,
30
    IPAddr,
31
    IPv4,
32
    IPv6
33

34
import Base: isless, show, print, parse, bind, convert, isreadable, iswritable, alloc_buf_hook, _uv_hook_close
35

36
using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize,
37
    notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError,
38
    eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive,
39
    preserve_handle, unpreserve_handle, iolock_begin, iolock_end,
40
    uv_status_string, check_open, OS_HANDLE, RawFD,
41
    UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL,
42
    UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS,
43
    UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL,
44
    UV_EAI_FAMILY, UV_EAI_NODATA, UV_EAI_NONAME,
45
    UV_EAI_OVERFLOW, UV_EAI_PROTOCOL, UV_EAI_SERVICE,
46
    UV_EAI_SOCKTYPE, UV_EAI_MEMORY
47

48
include("IPAddr.jl")
49
include("addrinfo.jl")
50

51
"""
52
    TCPSocket(; delay=true)
53

54
Open a TCP socket using libuv. If `delay` is true, libuv delays creation of the
55
socket's file descriptor till the first [`bind`](@ref) call. `TCPSocket` has various
56
fields to denote the state of the socket as well as its send/receive buffers.
57
"""
58
mutable struct TCPSocket <: LibuvStream
59
    handle::Ptr{Cvoid}
60
    status::Int
61
    buffer::IOBuffer
62
    cond::Base.ThreadSynchronizer
63
    readerror::Any
64
    sendbuf::Union{IOBuffer, Nothing}
65
    lock::ReentrantLock # advisory lock
66
    throttle::Int
67

68
    function TCPSocket(handle::Ptr{Cvoid}, status)
1,318✔
69
        tcp = new(
1,318✔
70
                handle,
71
                status,
72
                PipeBuffer(),
73
                Base.ThreadSynchronizer(),
74
                nothing,
75
                nothing,
76
                ReentrantLock(),
77
                Base.DEFAULT_READ_BUFFER_SZ)
78
        associate_julia_struct(tcp.handle, tcp)
1,318✔
79
        finalizer(uvfinalize, tcp)
1,318✔
80
        return tcp
1,318✔
81
    end
82
end
83

84
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
85
function TCPSocket(; delay=true)
2,636✔
86
    tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
1,318✔
87
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
1,318✔
88
    iolock_begin()
1,318✔
89
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
1,318✔
90
                eventloop(), tcp.handle, af_spec)
91
    uv_error("failed to create tcp socket", err)
1,318✔
92
    tcp.status = StatusInit
1,318✔
93
    iolock_end()
1,318✔
94
    return tcp
1,318✔
95
end
96

97
function TCPSocket(fd::OS_HANDLE)
×
98
    tcp = TCPSocket()
×
99
    iolock_begin()
×
100
    err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), tcp.handle, fd)
×
101
    uv_error("tcp_open", err)
×
102
    tcp.status = StatusOpen
×
103
    iolock_end()
×
104
    return tcp
×
105
end
106
if OS_HANDLE != RawFD
107
    TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd))
×
108
end
109

110

111
mutable struct TCPServer <: LibuvServer
112
    handle::Ptr{Cvoid}
113
    status::Int
114
    cond::Base.ThreadSynchronizer
115

116
    function TCPServer(handle::Ptr{Cvoid}, status)
540✔
117
        tcp = new(
540✔
118
            handle,
119
            status,
120
            Base.ThreadSynchronizer())
121
        associate_julia_struct(tcp.handle, tcp)
540✔
122
        finalizer(uvfinalize, tcp)
540✔
123
        return tcp
540✔
124
    end
125
end
126

127
# Keyword arg "delay": if true, libuv delays creation of socket fd till bind.
128
# It can be set to false if there is a need to set socket options before
129
# further calls to `bind` and `listen`, e.g. `SO_REUSEPORT`.
130
function TCPServer(; delay=true)
1,080✔
131
    tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit)
540✔
132
    af_spec = delay ? 0 : 2   # AF_UNSPEC is 0, AF_INET is 2
540✔
133
    iolock_begin()
540✔
134
    err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint),
540✔
135
                eventloop(), tcp.handle, af_spec)
136
    uv_error("failed to create tcp server", err)
540✔
137
    tcp.status = StatusInit
540✔
138
    iolock_end()
540✔
139
    return tcp
540✔
140
end
141

142
"""
143
    accept(server[, client])
144

145
Accepts a connection on the given server and returns a connection to the client. An
146
uninitialized client stream may be provided, in which case it will be used instead of
147
creating a new stream.
148
"""
149
accept(server::TCPServer) = accept(server, TCPSocket())
714✔
150

151
function accept(callback, server::LibuvServer)
×
152
    task = @async try
×
153
            while true
×
154
                client = accept(server)
×
155
                callback(client)
×
156
            end
×
157
        catch ex
158
            # accept below may explicitly throw UV_ECONNABORTED:
159
            # filter that out since we expect that error
160
            if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server)
×
161
                rethrow()
×
162
            end
163
        end
164
    return task # caller is responsible for checking for errors
×
165
end
166

167

168
# UDP
169
"""
170
    UDPSocket()
171

172
Open a UDP socket using libuv. `UDPSocket` has various
173
fields to denote the state of the socket.
174
"""
175
mutable struct UDPSocket <: LibuvStream
176
    handle::Ptr{Cvoid}
177
    status::Int
178
    recvnotify::Base.ThreadSynchronizer
179
    cond::Base.ThreadSynchronizer
180

181
    function UDPSocket(handle::Ptr{Cvoid}, status)
×
182
        cond = Base.ThreadSynchronizer()
×
183
        udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond)
×
184
        associate_julia_struct(udp.handle, udp)
×
185
        finalizer(uvfinalize, udp)
×
186
        return udp
×
187
    end
188
end
189
function UDPSocket()
×
190
    this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit)
×
191
    iolock_begin()
×
192
    err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
×
193
                eventloop(), this.handle)
194
    uv_error("failed to create udp socket", err)
×
195
    this.status = StatusInit
×
196
    iolock_end()
×
197
    return this
×
198
end
199

200
show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")
×
201

202
function _uv_hook_close(sock::UDPSocket)
×
203
    lock(sock.cond)
×
204
    try
×
205
        sock.status = StatusClosed
×
206
        notify(sock.cond)
×
207
        notify_error(sock.recvnotify, EOFError())
×
208
    finally
209
        unlock(sock.cond)
×
210
    end
211
    nothing
×
212
end
213

214
# Disables dual stack mode.
215
const UV_TCP_IPV6ONLY = 1
216

217
# Disables dual stack mode. Only available when using ipv6 binf
218
const UV_UDP_IPV6ONLY = 1
219

220
# Indicates message was truncated because read buffer was too small. The
221
# remainder was discarded by the OS.
222
const UV_UDP_PARTIAL = 2
223

224
# Indicates if SO_REUSEADDR will be set when binding the handle in uv_udp_bind. This sets
225
# the SO_REUSEPORT socket flag on the BSDs and OS X. On other Unix platforms, it sets the
226
# SO_REUSEADDR flag. What that means is that multiple threads or processes can bind to the
227
# same address without error (provided they all set the flag) but only the last one to bind
228
# will receive any traffic, in effect "stealing" the port from the previous listener.
229
const UV_UDP_REUSEADDR = 4
230

231
##
232

233
function _bind(sock::Union{TCPServer, TCPSocket}, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
716✔
234
    host_in = Ref(hton(host.host))
716✔
235
    return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
716✔
236
            sock, hton(port), host_in, flags, host isa IPv6)
237
end
238

239
function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0))
×
240
    host_in = Ref(hton(host.host))
×
241
    return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
×
242
            sock, hton(port), host_in, flags, host isa IPv6)
243
end
244

245
"""
246
    bind(socket::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...)
247

248
Bind `socket` to the given `host:port`. Note that `0.0.0.0` will listen on all devices.
249

250
* The `ipv6only` parameter disables dual stack mode. If `ipv6only=true`, only an IPv6 stack is created.
251
* If `reuseaddr=true`, multiple threads or processes can bind to the same address without error
252
  if they all set `reuseaddr=true`, but only the last to bind will receive any traffic.
253
"""
254
function bind(sock::Union{TCPServer, UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only = false, reuseaddr = false, kws...)
1,432✔
255
    if sock.status != StatusInit
716✔
256
        error("$(typeof(sock)) is not in initialization state")
×
257
    end
258
    flags = 0
716✔
259
    if isa(host, IPv6) && ipv6only
716✔
260
        flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY
×
261
    end
262
    if isa(sock, UDPSocket) && reuseaddr
716✔
263
        flags |= UV_UDP_REUSEADDR
×
264
    end
265
    iolock_begin()
716✔
266
    err = _bind(sock, host, UInt16(port), UInt32(flags))
716✔
267
    if err < 0
716✔
268
        iolock_end()
×
269
        if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL
×
270
            #TODO: this codepath is not currently tested
271
            throw(_UVError("bind", err))
×
272
        else
273
            return false
×
274
        end
275
    end
276
    if isa(sock, TCPServer) || isa(sock, UDPSocket)
716✔
277
        sock.status = StatusOpen
540✔
278
    end
279
    isa(sock, UDPSocket) && setopt(sock; kws...)
716✔
280
    iolock_end()
716✔
281
    return true
716✔
282
end
283

284
bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port)
540✔
285

286
"""
287
    setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
288

289
Set UDP socket options.
290

291
* `multicast_loop`: loopback for multicast packets (default: `true`).
292
* `multicast_ttl`: TTL for multicast packets (default: `nothing`).
293
* `enable_broadcast`: flag must be set to `true` if socket will be used for broadcast
294
  messages, or else the UDP system will return an access error (default: `false`).
295
* `ttl`: Time-to-live of packets sent on the socket (default: `nothing`).
296
"""
297
function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
×
298
    iolock_begin()
×
299
    if sock.status == StatusUninit
×
300
        error("Cannot set options on uninitialized socket")
×
301
    end
302
    if multicast_loop !== nothing
×
303
        uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0)
×
304
    end
305
    if multicast_ttl !== nothing
×
306
        uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl))
×
307
    end
308
    if enable_broadcast !== nothing
×
309
        uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast))
×
310
    end
311
    if ttl !== nothing
×
312
        uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl))
×
313
    end
314
    iolock_end()
×
315
    nothing
×
316
end
317

318
"""
319
    recv(socket::UDPSocket)
320

321
Read a UDP packet from the specified socket, and return the bytes received. This call blocks.
322
"""
323
function recv(sock::UDPSocket)
×
324
    addr, data = recvfrom(sock)
×
325
    return data
×
326
end
327

328
function uv_recvcb end
329

330
"""
331
    recvfrom(socket::UDPSocket) -> (host_port, data)
332

333
Read a UDP packet from the specified socket, returning a tuple of `(host_port, data)`, where
334
`host_port` will be an InetAddr{IPv4} or InetAddr{IPv6}, as appropriate.
335

336
!!! compat "Julia 1.3"
337
    Prior to Julia version 1.3, the first returned value was an address (`IPAddr`).
338
    In version 1.3 it was changed to an `InetAddr`.
339
"""
340
function recvfrom(sock::UDPSocket)
×
341
    iolock_begin()
×
342
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
343
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
344
        error("UDPSocket is not initialized and open")
×
345
    end
346
    if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
×
347
        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
×
348
                    sock,
349
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
×
350
                    @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
×
351
        uv_error("recv_start", err)
×
352
    end
353
    sock.status = StatusActive
×
354
    lock(sock.recvnotify)
×
355
    iolock_end()
×
356
    try
×
357
        From = Union{InetAddr{IPv4}, InetAddr{IPv6}}
×
358
        Data = Vector{UInt8}
×
359
        from, data = wait(sock.recvnotify)::Tuple{From, Data}
×
360
        return (from, data)
×
361
    finally
362
        unlock(sock.recvnotify)
×
363
    end
364
end
365

366
alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), Int(size)) # size is always 64k from libuv
×
367

368
function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
×
369
    sock = @handle_as handle UDPSocket
×
370
    lock(sock.recvnotify)
×
371
    try
×
372
        buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
×
373
        if nread == 0 && addr == C_NULL
×
374
            Libc.free(buf_addr)
×
375
        elseif nread < 0
×
376
            Libc.free(buf_addr)
×
377
            notify_error(sock.recvnotify, _UVError("recv", nread))
×
378
        elseif flags & UV_UDP_PARTIAL > 0
×
379
            Libc.free(buf_addr)
×
380
            notify_error(sock.recvnotify, "Partial message received")
×
381
        else
382
            buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
×
383
            if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
×
384
                buf = unsafe_wrap(Array, buf_addr, nread, own=true)
×
385
            else
386
                buf = Vector{UInt8}(undef, nread)
×
387
                GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread)
×
388
                Libc.free(buf_addr)
×
389
            end
390
            # need to check the address type in order to convert to a Julia IPAddr
391
            host = IPv4(0)
×
392
            port = UInt16(0)
×
393
            if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
×
394
                host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
×
395
                port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
×
396
            elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
×
397
                tmp = Ref{UInt128}(0)
×
398
                scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
×
399
                host = IPv6(ntoh(tmp[]))
×
400
                port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
×
401
            end
402
            from = InetAddr(host, port)
×
403
            notify(sock.recvnotify, (from, buf), all=false)
×
404
        end
405
        if sock.status == StatusActive && isempty(sock.recvnotify)
×
406
            sock.status = StatusOpen
×
407
            ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
×
408
        end
409
    finally
410
        unlock(sock.recvnotify)
×
411
    end
412
    nothing
×
413
end
414

415
function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf)
×
416
    req = Libc.malloc(Base._sizeof_uv_udp_send)
×
417
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
×
418
    host_in = Ref(hton(ipaddr.host))
×
419
    err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint),
×
420
                req, sock, hton(port), host_in, buf, sizeof(buf),
421
                @cfunction(Base.uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)),
×
422
                ipaddr isa IPv6)
423
    if err < 0
×
424
        Libc.free(req)
×
425
        uv_error("send", err)
×
426
    end
427
    return req
×
428
end
429

430
"""
431
    send(socket::UDPSocket, host::IPAddr, port::Integer, msg)
432

433
Send `msg` over `socket` to `host:port`.
434
"""
435
function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
×
436
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
437
    iolock_begin()
×
438
    if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive
×
439
        error("UDPSocket is not initialized and open")
×
440
    end
441
    uvw = _send_async(sock, ipaddr, UInt16(port), msg)
×
442
    ct = current_task()
×
443
    preserve_handle(ct)
×
444
    Base.sigatomic_begin()
×
445
    uv_req_set_data(uvw, ct)
×
446
    iolock_end()
×
447
    status = try
×
448
        Base.sigatomic_end()
×
449
        wait()::Cint
×
450
    finally
451
        Base.sigatomic_end()
×
452
        iolock_begin()
×
453
        ct.queue === nothing || list_deletefirst!(ct.queue, ct)
×
454
        if uv_req_data(uvw) != C_NULL
×
455
            # uvw is still alive,
456
            # so make sure we won't get spurious notifications later
457
            uv_req_set_data(uvw, C_NULL)
×
458
        else
459
            # done with uvw
460
            Libc.free(uvw)
×
461
        end
462
        iolock_end()
×
463
        unpreserve_handle(ct)
×
464
    end
465
    uv_error("send", status)
×
466
    nothing
×
467
end
468

469

470
#from `connect`
471
function uv_connectcb(conn::Ptr{Cvoid}, status::Cint)
1,030✔
472
    hand = ccall(:jl_uv_connect_handle, Ptr{Cvoid}, (Ptr{Cvoid},), conn)
1,030✔
473
    sock = @handle_as hand LibuvStream
1,030✔
474
    lock(sock.cond)
2,060✔
475
    try
1,030✔
476
        if status >= 0 # success
1,030✔
477
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
2,060✔
478
                sock.status = StatusOpen
1,030✔
479
            end
480
        else
481
            sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this
×
482
            if !(sock.status == StatusClosed || sock.status == StatusClosing)
×
483
                ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand)
×
484
                sock.status = StatusClosing
×
485
            end
486
        end
487
        notify(sock.cond)
1,030✔
488
    finally
489
        unlock(sock.cond)
2,060✔
490
    end
491
    Libc.free(conn)
1,030✔
492
    nothing
1,030✔
493
end
494

495
function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer)
604✔
496
    iolock_begin()
604✔
497
    if sock.status != StatusInit
604✔
498
        error("TCPSocket is not in initialization state")
×
499
    end
500
    if !(0 <= port <= typemax(UInt16))
604✔
501
        throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port"))
×
502
    end
503
    host_in = Ref(hton(host.host))
604✔
504
    uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint),
604✔
505
                              sock, host_in, hton(UInt16(port)), @cfunction(uv_connectcb, Cvoid, (Ptr{Cvoid}, Cint)),
506
                              host isa IPv6))
507
    sock.status = StatusConnecting
604✔
508
    iolock_end()
604✔
509
    nothing
604✔
510
end
511

512
connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port)
×
513

514
function wait_connected(x::LibuvStream)
1,381✔
515
    iolock_begin()
1,381✔
516
    check_open(x)
1,381✔
517
    isopen(x) || x.readerror === nothing || throw(x.readerror)
1,381✔
518
    preserve_handle(x)
1,381✔
519
    lock(x.cond)
1,381✔
520
    try
1,381✔
521
        while x.status == StatusConnecting
2,411✔
522
            iolock_end()
1,030✔
523
            wait(x.cond)
1,030✔
524
            unlock(x.cond)
2,060✔
525
            iolock_begin()
1,030✔
526
            lock(x.cond)
1,030✔
527
        end
1,030✔
528
        isopen(x) || x.readerror === nothing || throw(x.readerror)
1,381✔
529
    finally
530
        unlock(x.cond)
2,762✔
531
        unpreserve_handle(x)
1,381✔
532
    end
533
    iolock_end()
1,381✔
534
    nothing
1,381✔
535
end
536

537
# Default Host to localhost
538

539
"""
540
    connect([host], port::Integer) -> TCPSocket
541

542
Connect to the host `host` on port `port`.
543
"""
544
connect(sock::TCPSocket, port::Integer) = connect(sock, localhost, port)
×
545
connect(port::Integer) = connect(localhost, port)
428✔
546

547
# Valid connect signatures for TCP
548
connect(host::AbstractString, port::Integer) = connect(TCPSocket(), host, port)
×
549
connect(addr::IPAddr, port::Integer) = connect(TCPSocket(), addr, port)
428✔
550
connect(addr::InetAddr) = connect(TCPSocket(), addr)
×
551

552
function connect!(sock::TCPSocket, host::AbstractString, port::Integer)
×
553
    if sock.status != StatusInit
×
554
        error("TCPSocket is not in initialization state")
×
555
    end
556
    ipaddr = getaddrinfo(host)
×
557
    connect!(sock, ipaddr, port)
×
558
    return sock
×
559
end
560

561
function connect(sock::LibuvStream, args...)
1,030✔
562
    connect!(sock, args...)
1,030✔
563
    wait_connected(sock)
1,030✔
564
    return sock
1,030✔
565
end
566

567
"""
568
    nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool)
569

570
Enables or disables Nagle's algorithm on a given TCP server or socket.
571

572
!!! compat "Julia 1.3"
573
    This function requires Julia 1.3 or later.
574
"""
575
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
462✔
576
    # disable or enable Nagle's algorithm on all OSes
577
    iolock_begin()
462✔
578
    check_open(sock)
462✔
579
    err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
462✔
580
    # TODO: check err
581
    iolock_end()
462✔
582
    return err
462✔
583
end
584

585
"""
586
    quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool)
587

588
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
589
"""
590
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
462✔
591
    iolock_begin()
462✔
592
    check_open(sock)
462✔
593
    @static if Sys.islinux()
×
594
        # tcp_quickack is a linux only option
595
        if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
462✔
596
            @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
×
597
        end
598
    end
599
    iolock_end()
462✔
600
    nothing
462✔
601
end
602

603

604
##
605

606
const BACKLOG_DEFAULT = 511
607

608
"""
609
    listen([addr, ]port::Integer; backlog::Integer=BACKLOG_DEFAULT) -> TCPServer
610

611
Listen on port on the address specified by `addr`.
612
By default this listens on `localhost` only.
613
To listen on all interfaces pass `IPv4(0)` or `IPv6(0)` as appropriate.
614
`backlog` determines how many connections can be pending (not having
615
called [`accept`](@ref)) before the server will begin to
616
reject them. The default value of `backlog` is 511.
617
"""
618
function listen(addr; backlog::Integer=BACKLOG_DEFAULT)
×
619
    sock = TCPServer()
×
620
    bind(sock, addr) || error("cannot bind to port; may already be in use or access denied")
×
621
    listen(sock; backlog=backlog)
×
622
    return sock
×
623
end
624
listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog)
×
625
listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog)
×
626

627
function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
852✔
628
    uv_error("listen", trylisten(sock; backlog=backlog))
426✔
629
    return sock
426✔
630
end
631

632
# from `listen`
633
function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint)
1,029✔
634
    sock = @handle_as stream LibuvServer
1,029✔
635
    lock(sock.cond)
2,058✔
636
    try
1,029✔
637
        if status >= 0
1,029✔
638
            notify(sock.cond)
1,029✔
639
        else
640
            notify_error(sock.cond, _UVError("connection", status))
1,029✔
641
        end
642
    finally
643
        unlock(sock.cond)
2,058✔
644
    end
645
    nothing
1,029✔
646
end
647

648
function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT)
1,932✔
649
    iolock_begin()
966✔
650
    check_open(sock)
966✔
651
    err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}),
966✔
652
                sock, backlog, @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)))
653
    sock.status = StatusActive
966✔
654
    iolock_end()
966✔
655
    return err
966✔
656
end
657

658
##
659

660
function accept_nonblock(server::TCPServer, client::TCPSocket)
1,317✔
661
    iolock_begin()
1,317✔
662
    if client.status != StatusInit
1,317✔
663
        error("client TCPSocket is not in initialization state")
×
664
    end
665
    err = ccall(:uv_accept, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), server.handle, client.handle)
1,317✔
666
    if err == 0
1,317✔
667
        client.status = StatusOpen
603✔
668
    end
669
    iolock_end()
1,317✔
670
    return err
1,317✔
671
end
672

673
function accept_nonblock(server::TCPServer)
×
674
    client = TCPSocket()
×
675
    uv_error("accept", accept_nonblock(server, client))
×
676
    return client
×
677
end
678

679
function accept(server::LibuvServer, client::LibuvStream)
1,140✔
680
    iolock_begin()
1,140✔
681
    if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
1,140✔
682
        throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
×
683
    end
684
    while isopen(server)
2,169✔
685
        err = accept_nonblock(server, client)
2,169✔
686
        if err == 0
2,169✔
687
            iolock_end()
1,029✔
688
            return client
1,029✔
689
        elseif err != UV_EAGAIN
1,140✔
690
            uv_error("accept", err)
×
691
        end
692
        preserve_handle(server)
1,140✔
693
        lock(server.cond)
1,140✔
694
        iolock_end()
1,140✔
695
        try
1,140✔
696
            wait(server.cond)
1,140✔
697
        finally
698
            unlock(server.cond)
2,058✔
699
            unpreserve_handle(server)
1,029✔
700
        end
701
        iolock_begin()
1,029✔
702
    end
1,029✔
703
    uv_error("accept", UV_ECONNABORTED)
×
704
    nothing
×
705
end
706

707
## Utility functions
708

709
const localhost = ip"127.0.0.1"
710

711
"""
712
    listenany([host::IPAddr,] port_hint; backlog::Integer=BACKLOG_DEFAULT) -> (UInt16, TCPServer)
713

714
Create a `TCPServer` on any port, using hint as a starting point. Returns a tuple of the
715
actual port that the server was created on and the server itself.
716
The backlog argument defines the maximum length to which the queue of pending connections for sockfd may grow.
717
"""
718
function listenany(host::IPAddr, default_port; backlog::Integer=BACKLOG_DEFAULT)
1,080✔
719
    addr = InetAddr(host, default_port)
540✔
720
    while true
540✔
721
        sock = TCPServer()
540✔
722
        if bind(sock, addr) && trylisten(sock; backlog) == 0
540✔
723
            if default_port == 0
540✔
724
                _addr, port = getsockname(sock)
×
725
                return (port, sock)
×
726
            end
727
            return (addr.port, sock)
540✔
728
        end
729
        close(sock)
×
730
        addr = InetAddr(addr.host, addr.port + UInt16(1))
×
731
        if addr.port == default_port
×
732
            error("no ports available")
×
733
        end
734
    end
×
735
end
736

737
listenany(default_port; backlog::Integer=BACKLOG_DEFAULT) = listenany(localhost, default_port; backlog)
858✔
738

739
function udp_set_membership(sock::UDPSocket, group_addr::String,
×
740
                            interface_addr::Union{Nothing, String}, operation)
741
    if interface_addr === nothing
×
742
        interface_addr = C_NULL
×
743
    end
744
    r = ccall(:uv_udp_set_membership, Cint,
×
745
              (Ptr{Cvoid}, Cstring, Cstring, Cint),
746
              sock.handle, group_addr, interface_addr, operation)
747
    uv_error("uv_udp_set_membership", r)
×
748
    return
×
749
end
750

751
"""
752
    join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
753

754
Join a socket to a particular multicast group defined by `group_addr`.
755
If `interface_addr` is given, specifies a particular interface for multi-homed
756
systems.  Use `leave_multicast_group()` to disable reception of a group.
757
"""
758
function join_multicast_group(sock::UDPSocket, group_addr::String,
×
759
                              interface_addr::Union{Nothing, String} = nothing)
760
    return udp_set_membership(sock, group_addr, interface_addr, 1)
×
761
end
762
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
×
763
                              interface_addr::Union{Nothing, IPAddr} = nothing)
764
    if interface_addr !== nothing
×
765
        interface_addr = string(interface_addr)
×
766
    end
767
    return join_multicast_group(sock, string(group_addr), interface_addr)
×
768
end
769

770
"""
771
    leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
772

773
Remove a socket from  a particular multicast group defined by `group_addr`.
774
If `interface_addr` is given, specifies a particular interface for multi-homed
775
systems.  Use `join_multicast_group()` to enable reception of a group.
776
"""
777
function leave_multicast_group(sock::UDPSocket, group_addr::String,
×
778
                               interface_addr::Union{Nothing, String} = nothing)
779
    return udp_set_membership(sock, group_addr, interface_addr, 0)
×
780
end
781
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
×
782
                               interface_addr::Union{Nothing, IPAddr} = nothing)
783
    if interface_addr !== nothing
×
784
        interface_addr = string(interface_addr)
×
785
    end
786
    return leave_multicast_group(sock, string(group_addr), interface_addr)
×
787
end
788

789
"""
790
    getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
791

792
Get the IP address and port that the given socket is bound to.
793
"""
794
getsockname(sock::Union{TCPSocket, TCPServer}) = _sockname(sock, true)
176✔
795

796

797
"""
798
    getpeername(sock::TCPSocket) -> (IPAddr, UInt16)
799

800
Get the IP address and port of the remote endpoint that the given
801
socket is connected to. Valid only for connected TCP sockets.
802
"""
803
getpeername(sock::TCPSocket) = _sockname(sock, false)
×
804

805
function _sockname(sock, self=true)
196✔
806
    sock.status == StatusInit || check_open(sock)
216✔
807
    rport = Ref{Cushort}(0)
196✔
808
    raddress = zeros(UInt8, 16)
196✔
809
    rfamily = Ref{Cuint}(0)
196✔
810

811
    iolock_begin()
196✔
812
    if self
196✔
813
        r = ccall(:jl_tcp_getsockname, Int32,
196✔
814
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
815
                sock.handle, rport, raddress, rfamily)
816
    else
817
        r = ccall(:jl_tcp_getpeername, Int32,
×
818
                (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}),
819
                sock.handle, rport, raddress, rfamily)
820
    end
821
    iolock_end()
196✔
822
    uv_error("cannot obtain socket name", r)
196✔
823
    port = ntoh(rport[])
196✔
824
    af_inet6 = @static if Sys.iswindows() # AF_INET6 in <sys/socket.h>
×
825
        23
826
    elseif Sys.isapple()
827
        30
828
    elseif Sys.KERNEL ∈ (:FreeBSD, :DragonFly)
829
        28
830
    elseif Sys.KERNEL ∈ (:NetBSD, :OpenBSD)
831
        24
832
    else
833
        10
196✔
834
    end
835

836
    if rfamily[] == 2 # AF_INET
196✔
837
        addrv4 = raddress[1:4]
196✔
838
        naddr = ntoh(unsafe_load(Ptr{Cuint}(pointer(addrv4)), 1))
196✔
839
        addr = IPv4(naddr)
196✔
840
    elseif rfamily[] == af_inet6
×
841
        naddr = ntoh(unsafe_load(Ptr{UInt128}(pointer(raddress)), 1))
×
842
        addr = IPv6(naddr)
×
843
    else
844
        error(string("unsupported address family: ", rfamily[]))
×
845
    end
846
    return addr, port
196✔
847
end
848

849
# domain sockets
850

851
include("PipeServer.jl")
852

853
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc