• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #38002

06 Feb 2025 06:14AM UTC coverage: 20.322% (-2.4%) from 22.722%
#38002

push

local

web-flow
bpart: Fully switch to partitioned semantics (#57253)

This is the final PR in the binding partitions series (modulo bugs and
tweaks), i.e. it closes #54654 and thus closes #40399, which was the
original design sketch.

This thus activates the full designed semantics for binding partitions,
in particular allowing safe replacement of const bindings. It in
particular allows struct redefinitions. This thus closes
timholy/Revise.jl#18 and also closes #38584.

The biggest semantic change here is probably that this gets rid of the
notion of "resolvedness" of a binding. Previously, a lot of the behavior
of our implementation depended on when bindings were "resolved", which
could happen at basically an arbitrary point (in the compiler, in REPL
completion, in a different thread), making a lot of the semantics around
bindings ill- or at least implementation-defined. There are several
related issues in the bugtracker, so this closes #14055 closes #44604
closes #46354 closes #30277

It is also the last step to close #24569.
It also supports bindings for undef->defined transitions and thus closes
#53958 closes #54733 - however, this is not activated yet for
performance reasons and may need some further optimization.

Since resolvedness no longer exists, we need to replace it with some
hopefully more well-defined semantics. I will describe the semantics
below, but before I do I will make two notes:

1. There are a number of cases where these semantics will behave
slightly differently than the old semantics absent some other task going
around resolving random bindings.
2. The new behavior (except for the replacement stuff) was generally
permissible under the old semantics if the bindings happened to be
resolved at the right time.

With all that said, there are essentially three "strengths" of bindings:

1. Implicit Bindings: Anything implicitly obtained from `using Mod`, "no
binding", plus slightly more exotic corner cases around conflicts

2. Weakly declared bindin... (continued)

11 of 111 new or added lines in 7 files covered. (9.91%)

1273 existing lines in 68 files now uncovered.

9908 of 48755 relevant lines covered (20.32%)

105126.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.83
/base/stream.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
import .Libc: RawFD, dup
4
if Sys.iswindows()
5
    import .Libc: WindowsRawSocket
6
    const OS_HANDLE = WindowsRawSocket
7
    const INVALID_OS_HANDLE = WindowsRawSocket(Ptr{Cvoid}(-1))
8
else
9
    const OS_HANDLE = RawFD
10
    const INVALID_OS_HANDLE = RawFD(-1)
11
end
12

13

14
## types ##
15
abstract type IOServer end
16
"""
17
    LibuvServer
18

19
An abstract type for IOServers handled by libuv.
20

21
If `server isa LibuvServer`, it must obey the following interface:
22

23
- `server.handle` must be a `Ptr{Cvoid}`
24
- `server.status` must be an `Int`
25
- `server.cond` must be a `GenericCondition`
26
"""
27
abstract type LibuvServer <: IOServer end
28

29
function getproperty(server::LibuvServer, name::Symbol)
30
    if name === :handle
5✔
31
        return getfield(server, :handle)::Ptr{Cvoid}
24✔
32
    elseif name === :status
5✔
33
        return getfield(server, :status)::Int
52✔
34
    elseif name === :cond
×
35
        return getfield(server, :cond)::GenericCondition
×
36
    else
37
        return getfield(server, name)
×
38
    end
39
end
40

41
"""
42
    LibuvStream
43

44
An abstract type for IO streams handled by libuv.
45

46
If `stream isa LibuvStream`, it must obey the following interface:
47

48
- `stream.handle`, if present, must be a `Ptr{Cvoid}`
49
- `stream.status`, if present, must be an `Int`
50
- `stream.buffer`, if present, must be an `IOBuffer`
51
- `stream.sendbuf`, if present, must be a `Union{Nothing,IOBuffer}`
52
- `stream.cond`, if present, must be a `GenericCondition`
53
- `stream.lock`, if present, must be an `AbstractLock`
54
- `stream.throttle`, if present, must be an `Int`
55
"""
56
abstract type LibuvStream <: IO end
57

58
function getproperty(stream::LibuvStream, name::Symbol)
11✔
59
    if name === :handle
233,916✔
60
        return getfield(stream, :handle)::Ptr{Cvoid}
2,701✔
61
    elseif name === :status
233,884✔
62
        return getfield(stream, :status)::Int
9,454✔
63
    elseif name === :buffer
231,877✔
64
        return getfield(stream, :buffer)::IOBuffer
228,174✔
65
    elseif name === :sendbuf
4,477✔
66
        return getfield(stream, :sendbuf)::Union{Nothing,IOBuffer}
2,432✔
67
    elseif name === :cond
2,277✔
68
        return getfield(stream, :cond)::GenericCondition
376✔
69
    elseif name === :lock
1,981✔
70
        return getfield(stream, :lock)::AbstractLock
338✔
71
    elseif name === :throttle
1,941✔
72
        return getfield(stream, :throttle)::Int
103✔
73
    else
74
        return getfield(stream, name)
1,966✔
75
    end
76
end
77

78
# IO
79
# +- GenericIOBuffer{T<:AbstractVector{UInt8}} (not exported)
80
# +- AbstractPipe (not exported)
81
# .  +- Pipe
82
# .  +- Process (not exported)
83
# .  +- ProcessChain (not exported)
84
# +- DevNull (not exported)
85
# +- Filesystem.File
86
# +- LibuvStream (not exported)
87
# .  +- PipeEndpoint (not exported)
88
# .  +- TCPSocket
89
# .  +- TTY (not exported)
90
# .  +- UDPSocket
91
# .  +- BufferStream (FIXME: 2.0)
92
# +- IOBuffer = Base.GenericIOBuffer{Vector{UInt8}}
93
# +- IOStream
94

95
# IOServer
96
# +- LibuvServer
97
# .  +- PipeServer
98
# .  +- TCPServer
99

100
# Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported)
101

102
bytesavailable(s::LibuvStream) = bytesavailable(s.buffer)
618✔
103

104
function eof(s::LibuvStream)
105
    bytesavailable(s) > 0 && return false
242✔
106
    wait_readnb(s, 1)
242✔
107
    # This function is race-y if used from multiple threads, but we guarantee
108
    # it to never return true until the stream is definitively exhausted
109
    # and that we won't return true if there's a readerror pending (it'll instead get thrown).
110
    # This requires some careful ordering here (TODO: atomic loads)
111
    bytesavailable(s) > 0 && return false
239✔
112
    open = isreadable(s) # must precede readerror check
256✔
113
    s.readerror === nothing || throw(s.readerror)
128✔
114
    return !open
128✔
115
end
116

117
# Limit our default maximum read and buffer size,
118
# to avoid DoS-ing ourself into an OOM situation
119
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
120

121
# manually limit our write size, if the OS doesn't support full-size writes
122
if Sys.iswindows()
123
    const MAX_OS_WRITE = UInt(0x1FF0_0000) # 511 MB (determined semi-empirically, limited to 31 MB on XP)
124
else
125
    const MAX_OS_WRITE = UInt(0x7FFF_0000) # almost 2 GB (both macOS and linux have this kernel restriction, although only macOS documents it)
126
end
127

128

129
const StatusUninit      = 0 # handle is allocated, but not initialized
130
const StatusInit        = 1 # handle is valid, but not connected/active
131
const StatusConnecting  = 2 # handle is in process of connecting
132
const StatusOpen        = 3 # handle is usable
133
const StatusActive      = 4 # handle is listening for read/write/connect events
134
const StatusClosing     = 5 # handle is closing / being closed
135
const StatusClosed      = 6 # handle is closed
136
const StatusEOF         = 7 # handle is a TTY that has seen an EOF event (pretends to be closed until reseteof is called)
137
const StatusPaused      = 8 # handle is Active, but not consuming events, and will transition to Open if it receives an event
138
function uv_status_string(x)
139
    s = x.status
×
140
    if x.handle == C_NULL
×
141
        if s == StatusClosed
×
142
            return "closed"
×
143
        elseif s == StatusUninit
×
144
            return "null"
×
145
        end
146
        return "invalid status"
×
147
    elseif s == StatusUninit
×
148
        return "uninit"
×
149
    elseif s == StatusInit
×
150
        return "init"
×
151
    elseif s == StatusConnecting
×
152
        return "connecting"
×
153
    elseif s == StatusOpen
×
154
        return "open"
×
155
    elseif s == StatusActive
×
156
        return "active"
×
157
    elseif s == StatusPaused
×
158
        return "paused"
×
159
    elseif s == StatusClosing
×
160
        return "closing"
×
161
    elseif s == StatusClosed
×
162
        return "closed"
×
163
    elseif s == StatusEOF
×
164
        return "eof"
×
165
    end
166
    return "invalid status"
×
167
end
168

169
mutable struct PipeEndpoint <: LibuvStream
170
    handle::Ptr{Cvoid}
171
    status::Int
172
    buffer::IOBuffer
173
    cond::ThreadSynchronizer
174
    readerror::Any
175
    sendbuf::Union{IOBuffer, Nothing}
176
    lock::ReentrantLock # advisory lock
177
    throttle::Int
178
    function PipeEndpoint(handle::Ptr{Cvoid}, status)
179
        p = new(handle,
2✔
180
                status,
181
                PipeBuffer(),
182
                ThreadSynchronizer(),
183
                nothing,
184
                nothing,
185
                ReentrantLock(),
186
                DEFAULT_READ_BUFFER_SZ)
187
        associate_julia_struct(handle, p)
2✔
188
        finalizer(uvfinalize, p)
2✔
189
        return p
×
190
    end
191
end
192

193
function PipeEndpoint()
×
194
    pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit)
×
195
    iolock_begin()
×
196
    err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0)
×
197
    uv_error("failed to create pipe endpoint", err)
×
198
    pipe.status = StatusInit
×
199
    iolock_end()
×
200
    return pipe
×
201
end
202

203
function PipeEndpoint(fd::OS_HANDLE)
×
204
    pipe = PipeEndpoint()
×
205
    iolock_begin()
×
206
    err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd)
×
207
    uv_error("pipe_open", err)
×
208
    pipe.status = StatusOpen
×
209
    iolock_end()
×
210
    return pipe
×
211
end
212
if OS_HANDLE != RawFD
213
    PipeEndpoint(fd::RawFD) = PipeEndpoint(Libc._get_osfhandle(fd))
×
214
end
215

216

217
mutable struct TTY <: LibuvStream
218
    handle::Ptr{Cvoid}
219
    status::Int
220
    buffer::IOBuffer
221
    cond::ThreadSynchronizer
222
    readerror::Any
223
    sendbuf::Union{IOBuffer, Nothing}
224
    lock::ReentrantLock # advisory lock
225
    throttle::Int
226
    @static if Sys.iswindows(); ispty::Bool; end
227
    function TTY(handle::Ptr{Cvoid}, status)
UNCOV
228
        tty = new(
×
229
            handle,
230
            status,
231
            PipeBuffer(),
232
            ThreadSynchronizer(),
233
            nothing,
234
            nothing,
235
            ReentrantLock(),
236
            DEFAULT_READ_BUFFER_SZ)
UNCOV
237
        associate_julia_struct(handle, tty)
×
UNCOV
238
        finalizer(uvfinalize, tty)
×
239
        @static if Sys.iswindows()
240
            tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Cvoid},), handle) != 0
241
        end
242
        return tty
×
243
    end
244
end
245

246
function TTY(fd::OS_HANDLE)
×
247
    tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit)
×
248
    iolock_begin()
×
249
    err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32),
×
250
        eventloop(), tty.handle, fd, 0)
251
    uv_error("TTY", err)
×
252
    tty.status = StatusOpen
×
253
    iolock_end()
×
254
    return tty
×
255
end
256
if OS_HANDLE != RawFD
257
    TTY(fd::RawFD) = TTY(Libc._get_osfhandle(fd))
×
258
end
259

260
show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(",
×
261
    _fd(stream), " ",
262
    uv_status_string(stream), ")")
263
show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(",
×
264
    _fd(stream), " ",
265
    uv_status_string(stream), ", ",
266
    bytesavailable(stream.buffer), " bytes waiting)")
267

268
# Shared LibuvStream object interface
269

270
function isreadable(io::LibuvStream)
271
    bytesavailable(io) > 0 && return true
137✔
272
    isopen(io) || return false
265✔
273
    io.status == StatusEOF && return false
9✔
274
    return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0
9✔
275
end
276

277
function iswritable(io::LibuvStream)
278
    isopen(io) || return false
116✔
279
    io.status == StatusClosing && return false
×
280
    return ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), io.handle) != 0
×
281
end
282

283
lock(s::LibuvStream) = lock(s.lock)
169✔
284
unlock(s::LibuvStream) = unlock(s.lock)
169✔
285

286
setup_stdio(stream::LibuvStream, ::Bool) = (stream, false)
×
287
rawhandle(stream::LibuvStream) = stream.handle
×
288
unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle
279✔
289

290
function init_stdio(handle::Ptr{Cvoid})
3✔
291
    iolock_begin()
3✔
292
    t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle)
3✔
293
    local io
294
    if t == UV_FILE
3✔
295
        fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle)
1✔
296
        # TODO: Replace ios.c file with libuv fs?
297
        # return File(fd)
298
        @static if Sys.iswindows()
299
            # TODO: Get ios.c to understand native handles
300
            fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0)
301
        end
302
        # TODO: Get fdio to work natively with file descriptors instead of integers
303
        io = fdio(cconvert(Cint, fd))
1✔
304
    elseif t == UV_TTY
2✔
UNCOV
305
        io = TTY(handle, StatusOpen)
×
306
    elseif t == UV_TCP
2✔
307
        Sockets = require_stdlib(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets"))
×
308
        io = Sockets.TCPSocket(handle, StatusOpen)
×
309
    elseif t == UV_NAMED_PIPE
2✔
310
        io = PipeEndpoint(handle, StatusOpen)
2✔
311
    else
312
        throw(ArgumentError("invalid stdio type: $t"))
×
313
    end
314
    iolock_end()
3✔
315
    return io
3✔
316
end
317

318
"""
319
    open(fd::OS_HANDLE) -> IO
320

321
Take a raw file descriptor wrap it in a Julia-aware IO type,
322
and take ownership of the fd handle.
323
Call `open(Libc.dup(fd))` to avoid the ownership capture
324
of the original handle.
325

326
!!! warning
327
    Do not call this on a handle that's already owned by some
328
    other part of the system.
329
"""
330
function open(h::OS_HANDLE)
×
331
    iolock_begin()
×
332
    t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h)
×
333
    local io
×
334
    if t == UV_FILE
×
335
        @static if Sys.iswindows()
×
336
            # TODO: Get ios.c to understand native handles
337
            h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0)
×
338
        end
339
        # TODO: Get fdio to work natively with file descriptors instead of integers
340
        io = fdio(cconvert(Cint, h))
×
341
    elseif t == UV_TTY
×
342
        io = TTY(h)
×
343
    elseif t == UV_TCP
×
344
        Sockets = require_stdlib(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets"))
×
345
        io = Sockets.TCPSocket(h)
×
346
    elseif t == UV_NAMED_PIPE
×
347
        io = PipeEndpoint(h)
×
348
        @static if Sys.iswindows()
×
349
            if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), io.handle) != 0
×
350
                # replace the Julia `PipeEndpoint` type with a `TTY` type,
351
                # if we detect that this is a cygwin pty object
352
                pipe_handle, pipe_status = io.handle, io.status
×
353
                io.status = StatusClosed
×
354
                io.handle = C_NULL
×
355
                io = TTY(pipe_handle, pipe_status)
×
356
            end
357
        end
358
    else
359
        throw(ArgumentError("invalid stdio type: $t"))
×
360
    end
361
    iolock_end()
×
362
    return io
×
363
end
364

365
if OS_HANDLE != RawFD
366
    function open(fd::RawFD)
×
367
        h = Libc.dup(Libc._get_osfhandle(fd)) # make a dup to steal ownership away from msvcrt
×
368
        try
×
369
            io = open(h)
×
370
            ccall(:_close, Cint, (RawFD,), fd) # on success, destroy the old libc handle
×
371
            return io
×
372
        catch ex
373
            ccall(:CloseHandle, stdcall, Cint, (OS_HANDLE,), h) # on failure, destroy the new nt handle
×
374
            rethrow(ex)
×
375
        end
376
    end
377
end
378

379
function isopen(x::Union{LibuvStream, LibuvServer})
9✔
380
    if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
4,802✔
381
        throw(ArgumentError("$x is not initialized"))
×
382
    end
383
    return x.status != StatusClosed
2,401✔
384
end
385

386
function check_open(x::Union{LibuvStream, LibuvServer})
387
    if !isopen(x) || x.status == StatusClosing
554✔
388
        throw(IOError("stream is closed or unusable", 0))
×
389
    end
390
end
391

392
function wait_readnb(x::LibuvStream, nb::Int)
29✔
393
    # fast path before iolock acquire
394
    bytesavailable(x.buffer) >= nb && return
29✔
395
    open = isopen(x) && x.status != StatusEOF # must precede readerror check
29✔
396
    x.readerror === nothing || throw(x.readerror)
29✔
397
    open || return
29✔
398
    iolock_begin()
29✔
399
    # repeat fast path after iolock acquire, before other expensive work
400
    bytesavailable(x.buffer) >= nb && (iolock_end(); return)
29✔
401
    open = isopen(x) && x.status != StatusEOF
29✔
402
    x.readerror === nothing || throw(x.readerror)
29✔
403
    open || (iolock_end(); return)
29✔
404
    # now do the "real" work
405
    oldthrottle = x.throttle
29✔
406
    preserve_handle(x)
29✔
407
    lock(x.cond)
29✔
408
    try
29✔
409
        while bytesavailable(x.buffer) < nb
53✔
410
            x.readerror === nothing || throw(x.readerror)
33✔
411
            isopen(x) || break
33✔
412
            x.status == StatusEOF && break
33✔
413
            x.throttle = max(nb, x.throttle)
29✔
414
            start_reading(x) # ensure we are reading
29✔
415
            iolock_end()
29✔
416
            wait(x.cond)
29✔
417
            unlock(x.cond)
24✔
418
            iolock_begin()
24✔
419
            lock(x.cond)
24✔
420
        end
48✔
421
    finally
422
        if isempty(x.cond)
25✔
423
            stop_reading(x) # stop reading iff there are currently no other read clients of the stream
25✔
424
        end
425
        if oldthrottle <= x.throttle <= nb
25✔
426
            # if we're interleaving readers, we might not get back to the "original" throttle
427
            # but we consider that an acceptable "risk", since we can't be quite sure what the intended value is now
428
            x.throttle = oldthrottle
×
429
        end
430
        unpreserve_handle(x)
25✔
431
        unlock(x.cond)
25✔
432
    end
433
    iolock_end()
24✔
434
    nothing
24✔
435
end
436

437
function closewrite(s::LibuvStream)
58✔
438
    iolock_begin()
58✔
439
    if !iswritable(s)
58✔
440
        iolock_end()
58✔
441
        return
58✔
442
    end
443
    req = Libc.malloc(_sizeof_uv_shutdown)
×
444
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
×
445
    err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
×
446
                req, s, @cfunction(uv_shutdowncb_task, Cvoid, (Ptr{Cvoid}, Cint)))
447
    if err < 0
×
448
        Libc.free(req)
×
449
        uv_error("shutdown", err)
×
450
    end
451
    ct = current_task()
×
452
    preserve_handle(ct)
×
453
    sigatomic_begin()
×
454
    uv_req_set_data(req, ct)
×
455
    iolock_end()
×
456
    local status
457
    try
×
458
        sigatomic_end()
×
459
        status = wait()::Cint
×
460
        sigatomic_begin()
×
461
    finally
462
        # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
463
        sigatomic_end()
×
464
        iolock_begin()
×
465
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
×
466
        if uv_req_data(req) != C_NULL
×
467
            # req is still alive,
468
            # so make sure we won't get spurious notifications later
469
            uv_req_set_data(req, C_NULL)
×
470
        else
471
            # done with req
472
            Libc.free(req)
×
473
        end
474
        iolock_end()
×
475
        unpreserve_handle(ct)
×
476
    end
477
    if isopen(s)
×
478
        if status < 0 || ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), s.handle) == 0
×
479
            close(s)
×
480
        end
481
    end
482
    if status < 0
×
483
        throw(_UVError("shutdown", status))
×
484
    end
485
    nothing
×
486
end
487

488
function wait_close(x::Union{LibuvStream, LibuvServer})
8✔
489
    preserve_handle(x)
8✔
490
    lock(x.cond)
8✔
491
    try
8✔
492
        while isopen(x)
12✔
493
            wait(x.cond)
4✔
494
        end
4✔
495
    finally
496
        unlock(x.cond)
8✔
497
        unpreserve_handle(x)
8✔
498
    end
499
    nothing
8✔
500
end
501

502
function close(stream::Union{LibuvStream, LibuvServer})
8✔
503
    iolock_begin()
8✔
504
    if stream.status == StatusInit
8✔
505
        ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
×
506
        stream.status = StatusClosing
×
507
    elseif isopen(stream)
8✔
508
        if stream.status != StatusClosing
4✔
509
            ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
4✔
510
            stream.status = StatusClosing
4✔
511
        end
512
    end
513
    iolock_end()
8✔
514
    wait_close(stream)
8✔
515
    nothing
8✔
516
end
517

518
function uvfinalize(uv::Union{LibuvStream, LibuvServer})
×
519
    iolock_begin()
×
520
    if uv.handle != C_NULL
×
521
        disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
×
522
        if uv.status == StatusUninit
×
523
            Libc.free(uv.handle)
×
524
        elseif uv.status == StatusInit
×
525
            ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
×
526
        elseif isopen(uv)
×
527
            if uv.status != StatusClosing
×
528
                ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
×
529
            end
530
        elseif uv.status == StatusClosed
×
531
            Libc.free(uv.handle)
×
532
        end
533
        uv.handle = C_NULL
×
534
        uv.status = StatusClosed
×
535
    end
536
    iolock_end()
×
537
    nothing
×
538
end
539

540
if Sys.iswindows()
541
    ispty(s::TTY) = s.ispty
×
542
    ispty(s::IO) = false
×
543
end
544

545
"""
546
    displaysize([io::IO]) -> (lines, columns)
547

548
Return the nominal size of the screen that may be used for rendering output to
549
this `IO` object.
550
If no input is provided, the environment variables `LINES` and `COLUMNS` are read.
551
If those are not set, a default size of `(24, 80)` is returned.
552

553
# Examples
554
```jldoctest
555
julia> withenv("LINES" => 30, "COLUMNS" => 100) do
556
           displaysize()
557
       end
558
(30, 100)
559
```
560

561
To get your TTY size,
562

563
```julia-repl
564
julia> displaysize(stdout)
565
(34, 147)
566
```
567
"""
568
displaysize(io::IO) = displaysize()
20✔
569
displaysize() = (parse(Int, get(ENV, "LINES",   "24")),
5✔
570
                 parse(Int, get(ENV, "COLUMNS", "80")))::Tuple{Int, Int}
571

572
# This is a fancy way to make de-specialize a call to `displaysize(io::IO)`
573
# which is unfortunately invalidated by REPL
574
#  (https://github.com/JuliaLang/julia/issues/56080)
575
#
576
# This makes the call less efficient, but avoids being invalidated by REPL.
577
displaysize_(io::IO) = Base.invoke_in_world(Base.tls_world_age(), displaysize, io)::Tuple{Int,Int}
3✔
578

UNCOV
579
function displaysize(io::TTY)
×
UNCOV
580
    check_open(io)
×
581

UNCOV
582
    local h::Int, w::Int
×
UNCOV
583
    default_size = displaysize()
×
584

UNCOV
585
    @static if Sys.iswindows()
×
UNCOV
586
        if ispty(io)
×
587
            # io is actually a libuv pipe but a cygwin/msys2 pty
UNCOV
588
            try
×
UNCOV
589
                h, w = parse.(Int, split(read(open(Base.Cmd(String["stty", "size"]), "r", io).out, String)))
×
UNCOV
590
                h > 0 || (h = default_size[1])
×
UNCOV
591
                w > 0 || (w = default_size[2])
×
UNCOV
592
                return h, w
×
593
            catch
UNCOV
594
                return default_size
×
595
            end
596
        end
597
    end
598

UNCOV
599
    s1 = Ref{Int32}(0)
×
UNCOV
600
    s2 = Ref{Int32}(0)
×
UNCOV
601
    iolock_begin()
×
UNCOV
602
    check_open(io)
×
UNCOV
603
    Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize,
×
604
                                      Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}),
605
                                      io, s1, s2) != 0)
UNCOV
606
    iolock_end()
×
UNCOV
607
    w, h = s1[], s2[]
×
UNCOV
608
    h > 0 || (h = default_size[1])
×
UNCOV
609
    w > 0 || (w = default_size[2])
×
UNCOV
610
    return h, w
×
611
end
612

613
### Libuv callbacks ###
614

615
## BUFFER ##
616
## Allocate space in buffer (for immediate use)
617
function alloc_request(buffer::IOBuffer, recommended_size::UInt)
×
618
    ensureroom(buffer, Int(recommended_size))
×
619
    ptr = buffer.append ? buffer.size + 1 : buffer.ptr
×
620
    nb = min(length(buffer.data)-buffer.offset, buffer.maxsize) + buffer.offset - ptr + 1
×
621
    return (Ptr{Cvoid}(pointer(buffer.data, ptr)), nb)
×
622
end
623

624
notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Cvoid}, len::UInt) = notify_filled(buffer, nread)
×
625

626
function notify_filled(buffer::IOBuffer, nread::Int)
627
    if buffer.append
20✔
628
        buffer.size += nread
20✔
629
    else
630
        buffer.ptr += nread
×
631
        buffer.size = max(buffer.size, buffer.ptr - 1)
×
632
    end
633
    nothing
×
634
end
635

636
function alloc_buf_hook(stream::LibuvStream, size::UInt)
×
637
    throttle = UInt(stream.throttle)
×
638
    return alloc_request(stream.buffer, (size > throttle) ? throttle : size)
×
639
end
640

641
function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid})
×
642
    hd = uv_handle_data(handle)
×
643
    if hd == C_NULL
×
644
        ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, 0)
×
645
        return nothing
×
646
    end
647
    stream = unsafe_pointer_to_objref(hd)::LibuvStream
×
648

649
    local data::Ptr{Cvoid}, newsize::Csize_t
×
650
    if stream.status != StatusActive
×
651
        data = C_NULL
×
652
        newsize = 0
×
653
    else
654
        (data, newsize) = alloc_buf_hook(stream, UInt(size))
×
655
        if data == C_NULL
×
656
            newsize = 0
×
657
        end
658
        # avoid aliasing of `nread` with `errno` in uv_readcb
659
        # or exceeding the Win32 maximum uv_buf_t len
660
        maxsize = @static Sys.iswindows() ? typemax(Cint) : typemax(Cssize_t)
×
661
        newsize > maxsize && (newsize = maxsize)
×
662
    end
663

664
    ccall(:jl_uv_buf_set_base, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), buf, data)
×
665
    ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, newsize)
×
666
    nothing
×
667
end
668

669
function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
670
    stream_unknown_type = @handle_as handle LibuvStream
671
    nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
672
    function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
24✔
673
        lock(stream.cond)
24✔
674
        if nread < 0
24✔
675
            if nread == UV_ENOBUFS && nrequested == 0
4✔
676
                # remind the client that stream.buffer is full
677
                notify(stream.cond)
×
678
            elseif nread == UV_EOF # libuv called uv_stop_reading already
4✔
679
                if stream.status != StatusClosing
4✔
680
                    stream.status = StatusEOF
4✔
681
                    notify(stream.cond)
4✔
682
                    if stream isa TTY
4✔
683
                        # stream can still be used by reseteof (or possibly write)
684
                    elseif !(stream isa PipeEndpoint) && ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0
4✔
685
                        # stream can still be used by write
686
                    else
687
                        # underlying stream is no longer useful: begin finalization
688
                        ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
×
689
                        stream.status = StatusClosing
×
690
                    end
691
                end
692
            else
693
                stream.readerror = _UVError("read", nread)
×
694
                notify(stream.cond)
×
695
                # This is a fatal connection error
696
                ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
×
697
                stream.status = StatusClosing
×
698
            end
699
        else
700
            notify_filled(stream.buffer, nread)
20✔
701
            notify(stream.cond)
20✔
702
        end
703
        unlock(stream.cond)
24✔
704

705
        # Stop background reading when
706
        # 1) there's nobody paying attention to the data we are reading
707
        # 2) we have accumulated a lot of unread data OR
708
        # 3) we have an alternate buffer that has reached its limit.
709
        if stream.status == StatusPaused ||
48✔
710
           (stream.status == StatusActive &&
711
            ((bytesavailable(stream.buffer) >= stream.throttle) ||
712
             (bytesavailable(stream.buffer) >= stream.buffer.maxsize)))
713
            # save cycles by stopping kernel notifications from arriving
714
            ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
×
715
            stream.status = StatusOpen
×
716
        end
717
        nothing
24✔
718
    end
719
    readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested))
720
    nothing
721
end
722

723
function reseteof(x::TTY)
×
724
    iolock_begin()
×
725
    if x.status == StatusEOF
×
726
        x.status = StatusOpen
×
727
    end
728
    iolock_end()
×
729
    nothing
×
730
end
731

732
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
4✔
733
    lock(uv.cond)
4✔
734
    try
4✔
735
        uv.status = StatusClosed
4✔
736
        # notify any listeners that exist on this libuv stream type
737
        notify(uv.cond)
4✔
738
    finally
739
        unlock(uv.cond)
4✔
740
    end
741
    nothing
4✔
742
end
743

744

745
##########################################
746
# Pipe Abstraction
747
#  (composed of two half-pipes: .in and .out)
748
##########################################
749

750
mutable struct Pipe <: AbstractPipe
751
    in::PipeEndpoint # writable
41✔
752
    out::PipeEndpoint # readable
753
end
754

755
"""
756
    Pipe()
757

758
Construct an uninitialized Pipe object, especially for IO communication between multiple processes.
759

760
The appropriate end of the pipe will be automatically initialized if the object is used in
761
process spawning. This can be useful to easily obtain references in process pipelines, e.g.:
762

763
```
764
julia> err = Pipe()
765

766
# After this `err` will be initialized and you may read `foo`'s
767
# stderr from the `err` pipe, or pass `err` to other pipelines.
768
julia> run(pipeline(pipeline(`foo`, stderr=err), `cat`), wait=false)
769

770
# Now destroy the write half of the pipe, so that the read half will get EOF
771
julia> closewrite(err)
772

773
julia> read(err, String)
774
"stderr messages"
775
```
776

777
See also [`Base.link_pipe!`](@ref).
778
"""
779
Pipe() = Pipe(PipeEndpoint(), PipeEndpoint())
41✔
780
pipe_reader(p::Pipe) = p.out
42✔
781
pipe_writer(p::Pipe) = p.in
2✔
782

783
"""
784
    link_pipe!(pipe; reader_supports_async=false, writer_supports_async=false)
785

786
Initialize `pipe` and link the `in` endpoint to the `out` endpoint. The keyword
787
arguments `reader_supports_async`/`writer_supports_async` correspond to
788
`OVERLAPPED` on Windows and `O_NONBLOCK` on POSIX systems. They should be `true`
789
unless they'll be used by an external program (e.g. the output of a command
790
executed with [`run`](@ref)).
791
"""
792
function link_pipe!(pipe::Pipe;
38✔
793
                    reader_supports_async = false,
794
                    writer_supports_async = false)
795
     link_pipe!(pipe.out, reader_supports_async, pipe.in, writer_supports_async)
38✔
796
     return pipe
38✔
797
end
798

799
show(io::IO, stream::Pipe) = print(io,
×
800
    "Pipe(",
801
    _fd(stream.in), " ",
802
    uv_status_string(stream.in), " => ",
803
    _fd(stream.out), " ",
804
    uv_status_string(stream.out), ", ",
805
    bytesavailable(stream), " bytes waiting)")
806

807

808
## Functions for PipeEndpoint and PipeServer ##
809

810
function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE)
×
811
    iolock_begin()
×
812
    if p.status != StatusInit
×
813
        error("pipe is already in use or has been closed")
×
814
    end
815
    err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle)
×
816
    uv_error("pipe_open", err)
×
817
    p.status = StatusOpen
×
818
    iolock_end()
×
819
    return p
×
820
end
821

822

823
function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool,
×
824
                    write_end::PipeEndpoint, writer_supports_async::Bool)
825
    rd, wr = link_pipe(reader_supports_async, writer_supports_async)
×
826
    try
×
827
        try
×
828
            open_pipe!(read_end, rd)
×
829
        catch
830
            close_pipe_sync(rd)
×
831
            rethrow()
×
832
        end
833
        open_pipe!(write_end, wr)
×
834
    catch
835
        close_pipe_sync(wr)
×
836
        rethrow()
×
837
    end
838
    nothing
×
839
end
840

841
function link_pipe(reader_supports_async::Bool, writer_supports_async::Bool)
×
842
    UV_NONBLOCK_PIPE = 0x40
×
843
    fildes = Ref{Pair{OS_HANDLE, OS_HANDLE}}(INVALID_OS_HANDLE => INVALID_OS_HANDLE) # read (in) => write (out)
×
844
    err = ccall(:uv_pipe, Int32, (Ptr{Pair{OS_HANDLE, OS_HANDLE}}, Cint, Cint),
×
845
                fildes,
846
                reader_supports_async * UV_NONBLOCK_PIPE,
847
                writer_supports_async * UV_NONBLOCK_PIPE)
848
    uv_error("pipe", err)
×
849
    return fildes[]
×
850
end
851

852
if Sys.iswindows()
853
    function close_pipe_sync(handle::WindowsRawSocket)
×
854
        ccall(:CloseHandle, stdcall, Cint, (WindowsRawSocket,), handle)
×
855
        nothing
×
856
    end
857
else
858
    function close_pipe_sync(handle::RawFD)
859
        ccall(:close, Cint, (RawFD,), handle)
×
860
        nothing
×
861
    end
862
end
863

864
## Functions for any LibuvStream ##
865

866
# flow control
867

868
function start_reading(stream::LibuvStream)
29✔
869
    iolock_begin()
29✔
870
    if stream.status == StatusOpen
29✔
871
        if !isreadable(stream)
18✔
872
            error("tried to read a stream that is not readable")
×
873
        end
874
        # libuv may call the alloc callback immediately
875
        # for a TTY on Windows, so ensure the status is set first
876
        stream.status = StatusActive
9✔
877
        ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
9✔
878
                    stream, @cfunction(uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
879
                    @cfunction(uv_readcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid})))
880
    elseif stream.status == StatusPaused
20✔
881
        stream.status = StatusActive
20✔
882
        ret = Int32(0)
20✔
883
    elseif stream.status == StatusActive
×
884
        ret = Int32(0)
×
885
    else
886
        ret = Int32(-1)
×
887
    end
888
    iolock_end()
29✔
889
    return ret
29✔
890
end
891

892
if Sys.iswindows()
893
    # the low performance version of stop_reading is required
894
    # on Windows due to a NT kernel bug that we can't use a blocking
895
    # stream for non-blocking (overlapped) calls,
896
    # and a ReadFile call blocking on one thread
897
    # causes all other operations on that stream to lockup
898
    function stop_reading(stream::LibuvStream)
×
899
        iolock_begin()
×
900
        if stream.status == StatusActive
×
901
            stream.status = StatusOpen
×
902
            ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
×
903
        end
904
        iolock_end()
×
905
        nothing
×
906
    end
907
else
908
    function stop_reading(stream::LibuvStream)
909
        iolock_begin()
25✔
910
        if stream.status == StatusActive
25✔
911
            stream.status = StatusPaused
21✔
912
        end
913
        iolock_end()
25✔
914
        nothing
24✔
915
    end
916
end
917

918
# bulk read / write
919

920
readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb))
×
921
function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
100,363✔
922
    iolock_begin()
100,363✔
923
    sbuf = s.buffer
100,363✔
924
    @assert sbuf.seekable == false
100,363✔
925
    @assert sbuf.maxsize >= nb
100,363✔
926

927
    function wait_locked(s, buf, nb)
200,677✔
928
        while bytesavailable(buf) < nb
102,037✔
929
            s.readerror === nothing || throw(s.readerror)
1,723✔
930
            isopen(s) || break
1,723✔
931
            s.status != StatusEOF || break
1,723✔
932
            iolock_end()
1,723✔
933
            wait_readnb(s, nb)
1,723✔
934
            iolock_begin()
1,723✔
935
        end
1,723✔
936
    end
937

938
    if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
100,363✔
939
        wait_locked(s, sbuf, nb)
99,906✔
940
    end
941
    if bytesavailable(sbuf) >= nb
100,363✔
942
        nread = readbytes!(sbuf, a, nb)
99,955✔
943
    else
944
        initsize = length(a)
408✔
945
        newbuf = PipeBuffer(a, maxsize=nb)
408✔
946
        newbuf.size = newbuf.offset # reset the write pointer to the beginning
408✔
947
        nread = try
408✔
948
            s.buffer = newbuf
408✔
949
            write(newbuf, sbuf)
816✔
950
            wait_locked(s, newbuf, nb)
408✔
951
            bytesavailable(newbuf)
408✔
952
        finally
953
            s.buffer = sbuf
408✔
954
        end
955
        _take!(a, _unsafe_take!(newbuf))
408✔
956
        length(a) >= initsize || resize!(a, initsize)
408✔
957
    end
958
    iolock_end()
100,363✔
959
    return nread
100,363✔
960
end
961

962
function read(stream::LibuvStream)
161✔
963
    wait_readnb(stream, typemax(Int))
164✔
964
    iolock_begin()
164✔
965
    bytes = take!(stream.buffer)
164✔
966
    iolock_end()
164✔
967
    return bytes
161✔
968
end
969

970
function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
126,288✔
971
    iolock_begin()
126,288✔
972
    sbuf = s.buffer
126,288✔
973
    @assert sbuf.seekable == false
126,288✔
974
    @assert sbuf.maxsize >= nb
126,288✔
975

976
    function wait_locked(s, buf, nb)
252,576✔
977
        while bytesavailable(buf) < nb
126,304✔
978
            s.readerror === nothing || throw(s.readerror)
24✔
979
            isopen(s) || throw(EOFError())
24✔
980
            s.status != StatusEOF || throw(EOFError())
28✔
981
            iolock_end()
20✔
982
            wait_readnb(s, nb)
20✔
983
            iolock_begin()
16✔
984
        end
16✔
985
    end
986

987
    if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
126,288✔
988
        wait_locked(s, sbuf, Int(nb))
126,288✔
989
    end
990
    if bytesavailable(sbuf) >= nb
126,280✔
991
        unsafe_read(sbuf, p, nb)
126,280✔
992
    else
993
        newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize=Int(nb))
×
994
        newbuf.size = newbuf.offset # reset the write pointer to the beginning
×
995
        try
×
996
            s.buffer = newbuf
×
997
            write(newbuf, sbuf)
×
998
            wait_locked(s, newbuf, Int(nb))
×
999
        finally
1000
            s.buffer = sbuf
×
1001
        end
1002
    end
1003
    iolock_end()
126,280✔
1004
    nothing
126,280✔
1005
end
1006

1007
function read(this::LibuvStream, ::Type{UInt8})
439✔
1008
    iolock_begin()
439✔
1009
    sbuf = this.buffer
439✔
1010
    @assert sbuf.seekable == false
439✔
1011
    while bytesavailable(sbuf) < 1
439✔
1012
        iolock_end()
1✔
1013
        eof(this) && throw(EOFError())
2✔
1014
        iolock_begin()
×
1015
    end
×
1016
    c = read(sbuf, UInt8)
438✔
1017
    iolock_end()
438✔
1018
    return c
438✔
1019
end
1020

1021
function readavailable(this::LibuvStream)
111✔
1022
    wait_readnb(this, 1) # unlike the other `read` family of functions, this one doesn't guarantee error reporting
111✔
1023
    iolock_begin()
111✔
1024
    buf = this.buffer
111✔
1025
    @assert buf.seekable == false
111✔
1026
    bytes = take!(buf)
111✔
1027
    iolock_end()
111✔
1028
    return bytes
111✔
1029
end
1030

1031
function copyuntil(out::IO, x::LibuvStream, c::UInt8; keep::Bool=false)
×
1032
    iolock_begin()
×
1033
    buf = x.buffer
×
1034
    @assert buf.seekable == false
×
1035
    if !occursin(c, buf) # fast path checks first
×
1036
        x.readerror === nothing || throw(x.readerror)
×
1037
        if isopen(x) && x.status != StatusEOF
×
1038
            preserve_handle(x)
×
1039
            lock(x.cond)
×
1040
            try
×
1041
                while !occursin(c, x.buffer)
×
1042
                    x.readerror === nothing || throw(x.readerror)
×
1043
                    isopen(x) || break
×
1044
                    x.status != StatusEOF || break
×
1045
                    start_reading(x) # ensure we are reading
×
1046
                    iolock_end()
×
1047
                    wait(x.cond)
×
1048
                    unlock(x.cond)
×
1049
                    iolock_begin()
×
1050
                    lock(x.cond)
×
1051
                end
×
1052
            finally
1053
                if isempty(x.cond)
×
1054
                    stop_reading(x) # stop reading iff there are currently no other read clients of the stream
×
1055
                end
1056
                unlock(x.cond)
×
1057
                unpreserve_handle(x)
×
1058
            end
1059
        end
1060
    end
1061
    copyuntil(out, buf, c; keep)
×
1062
    iolock_end()
×
1063
    return out
×
1064
end
1065

1066
uv_write(s::LibuvStream, p::Vector{UInt8}) = GC.@preserve p uv_write(s, pointer(p), UInt(sizeof(p)))
20✔
1067

1068
# caller must have acquired the iolock
1069
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
252✔
1070
    uvw = uv_write_async(s, p, n)
252✔
1071
    ct = current_task()
252✔
1072
    preserve_handle(ct)
252✔
1073
    sigatomic_begin()
252✔
1074
    uv_req_set_data(uvw, ct)
252✔
1075
    iolock_end()
252✔
1076
    local status
1077
    try
252✔
1078
        sigatomic_end()
252✔
1079
        # wait for the last chunk to complete (or error)
1080
        # assume that any errors would be sticky,
1081
        # (so we don't need to monitor the error status of the intermediate writes)
1082
        status = wait()::Cint
252✔
1083
        sigatomic_begin()
252✔
1084
    finally
1085
        # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
1086
        sigatomic_end()
252✔
1087
        iolock_begin()
252✔
1088
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
252✔
1089
        if uv_req_data(uvw) != C_NULL
252✔
1090
            # uvw is still alive,
1091
            # so make sure we won't get spurious notifications later
1092
            uv_req_set_data(uvw, C_NULL)
×
1093
        else
1094
            # done with uvw
1095
            Libc.free(uvw)
252✔
1096
        end
1097
        iolock_end()
252✔
1098
        unpreserve_handle(ct)
252✔
1099
    end
1100
    if status < 0
252✔
1101
        throw(_UVError("write", status))
×
1102
    end
1103
    return Int(n)
252✔
1104
end
1105

1106
# helper function for uv_write that returns the uv_write_t struct for the write
1107
# rather than waiting on it, caller must hold the iolock
1108
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
252✔
1109
    check_open(s)
252✔
1110
    while true
252✔
1111
        uvw = Libc.malloc(_sizeof_uv_write)
252✔
1112
        uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
252✔
1113
        nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
252✔
1114
        # TODO: use writev instead of a loop
1115
        err = ccall(:jl_uv_write,
252✔
1116
                    Int32,
1117
                    (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}),
1118
                    s, p, nwrite, uvw,
1119
                    @cfunction(uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)))
1120
        if err < 0
252✔
1121
            Libc.free(uvw)
×
1122
            uv_error("write", err)
×
1123
        end
1124
        n -= nwrite
252✔
1125
        p += nwrite
252✔
1126
        if n == 0
252✔
1127
            return uvw
252✔
1128
        end
1129
    end
×
1130
end
1131

1132

1133
# Optimized send
1134
# - smaller writes are buffered, final uv write on flush or when buffer full
1135
# - large isbits arrays are unbuffered and written directly
1136

1137
function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
430✔
1138
    while true
198✔
1139
        # try to add to the send buffer
1140
        iolock_begin()
430✔
1141
        buf = s.sendbuf
430✔
1142
        buf === nothing && break
430✔
1143
        totb = bytesavailable(buf) + n
198✔
1144
        if totb < buf.maxsize
198✔
1145
            nb = unsafe_write(buf, p, n)
198✔
1146
            iolock_end()
198✔
1147
            return nb
198✔
1148
        end
1149
        bytesavailable(buf) == 0 && break
×
1150
        # perform flush(s)
1151
        arr = take!(buf)
×
1152
        uv_write(s, arr)
×
1153
    end
×
1154
    # perform the output to the kernel
1155
    return uv_write(s, p, n)
232✔
1156
end
1157

1158
function flush(s::LibuvStream)
24✔
1159
    iolock_begin()
24✔
1160
    buf = s.sendbuf
24✔
1161
    if buf !== nothing
24✔
1162
        if bytesavailable(buf) > 0
20✔
1163
            arr = take!(buf)
20✔
1164
            uv_write(s, arr)
20✔
1165
            return
20✔
1166
        end
1167
    end
1168
    uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
4✔
1169
    return
4✔
1170
end
1171

1172
function buffer_writes(s::LibuvStream, bufsize)
1173
    sendbuf = PipeBuffer(bufsize)
8✔
1174
    iolock_begin()
8✔
1175
    s.sendbuf = sendbuf
8✔
1176
    iolock_end()
8✔
1177
    return s
8✔
1178
end
1179

1180
## low-level calls to libuv ##
1181

1182
function write(s::LibuvStream, b::UInt8)
1,978✔
1183
    buf = s.sendbuf
1,978✔
1184
    if buf !== nothing
1,978✔
1185
        iolock_begin()
438✔
1186
        if bytesavailable(buf) + 1 < buf.maxsize
438✔
1187
            n = write(buf, b)
876✔
1188
            iolock_end()
438✔
1189
            return n
438✔
1190
        end
1191
        iolock_end()
×
1192
    end
1193
    return write(s, Ref{UInt8}(b))
1,540✔
1194
end
1195

1196
function uv_writecb_task(req::Ptr{Cvoid}, status::Cint)
232✔
1197
    d = uv_req_data(req)
232✔
1198
    if d != C_NULL
232✔
1199
        uv_req_set_data(req, C_NULL) # let the Task know we got the writecb
232✔
1200
        t = unsafe_pointer_to_objref(d)::Task
232✔
1201
        schedule(t, status)
232✔
1202
    else
1203
        # no owner for this req, safe to just free it
1204
        Libc.free(req)
×
1205
    end
1206
    nothing
×
1207
end
1208

1209
function uv_shutdowncb_task(req::Ptr{Cvoid}, status::Cint)
×
1210
    d = uv_req_data(req)
×
1211
    if d != C_NULL
×
1212
        uv_req_set_data(req, C_NULL) # let the Task know we got the shutdowncb
×
1213
        t = unsafe_pointer_to_objref(d)::Task
×
1214
        schedule(t, status)
×
1215
    else
1216
        # no owner for this req, safe to just free it
1217
        Libc.free(req)
×
1218
    end
1219
    nothing
×
1220
end
1221

1222

1223
_fd(x::IOStream) = RawFD(fd(x))
6✔
1224
_fd(x::Union{OS_HANDLE, RawFD}) = x
×
1225

1226
function _fd(x::Union{LibuvStream, LibuvServer})
1227
    fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE)
4✔
1228
    if x.status != StatusUninit && x.status != StatusClosed && x.handle != C_NULL
4✔
1229
        err = ccall(:uv_fileno, Int32, (Ptr{Cvoid}, Ptr{OS_HANDLE}), x.handle, fd)
4✔
1230
        # handle errors by returning INVALID_OS_HANDLE
1231
    end
1232
    return fd[]
4✔
1233
end
1234

1235
struct RedirectStdStream <: Function
1236
    unix_fd::Int
1237
    writable::Bool
1238
end
1239
for (f, writable, unix_fd) in
1240
        ((:redirect_stdin, false, 0),
1241
         (:redirect_stdout, true, 1),
1242
         (:redirect_stderr, true, 2))
1243
    @eval const ($f) = RedirectStdStream($unix_fd, $writable)
1244
end
1245
function _redirect_io_libc(stream, unix_fd::Int)
1246
    posix_fd = _fd(stream)
10✔
1247
    @static if Sys.iswindows()
1248
        if 0 <= unix_fd <= 2
1249
            ccall(:SetStdHandle, stdcall, Int32, (Int32, OS_HANDLE),
1250
                -10 - unix_fd, Libc._get_osfhandle(posix_fd))
1251
        end
1252
    end
1253
    dup(posix_fd, RawFD(unix_fd))
10✔
1254
    nothing
9✔
1255
end
1256
function _redirect_io_global(io, unix_fd::Int)
1257
    unix_fd == 0 && (global stdin = io)
15✔
1258
    unix_fd == 1 && (global stdout = io)
15✔
1259
    unix_fd == 2 && (global stderr = io)
15✔
1260
    nothing
9✔
1261
end
1262
function (f::RedirectStdStream)(handle::Union{LibuvStream, IOStream})
4✔
1263
    _redirect_io_libc(handle, f.unix_fd)
5✔
1264
    c_sym = f.unix_fd == 0 ? cglobal(:jl_uv_stdin, Ptr{Cvoid}) :
10✔
1265
            f.unix_fd == 1 ? cglobal(:jl_uv_stdout, Ptr{Cvoid}) :
1266
            f.unix_fd == 2 ? cglobal(:jl_uv_stderr, Ptr{Cvoid}) :
1267
            C_NULL
1268
    c_sym == C_NULL || unsafe_store!(c_sym, handle.handle)
10✔
1269
    _redirect_io_global(handle, f.unix_fd)
5✔
1270
    return handle
5✔
1271
end
1272
function (f::RedirectStdStream)(::DevNull)
1273
    nulldev = @static Sys.iswindows() ? "NUL" : "/dev/null"
5✔
1274
    handle = open(nulldev, write=f.writable)
5✔
1275
    _redirect_io_libc(handle, f.unix_fd)
5✔
1276
    close(handle) # handle has been dup'ed in _redirect_io_libc
5✔
1277
    _redirect_io_global(devnull, f.unix_fd)
5✔
1278
    return devnull
5✔
1279
end
1280
function (f::RedirectStdStream)(io::AbstractPipe)
5✔
1281
    io2 = (f.writable ? pipe_writer : pipe_reader)(io)
10✔
1282
    f(io2)
6✔
1283
    _redirect_io_global(io, f.unix_fd)
5✔
1284
    return io
5✔
1285
end
1286
function (f::RedirectStdStream)(p::Pipe)
×
1287
    if p.in.status == StatusInit && p.out.status == StatusInit
×
1288
        link_pipe!(p)
×
1289
    end
1290
    io2 = getfield(p, f.writable ? :in : :out)
×
1291
    f(io2)
×
1292
    return p
×
1293
end
1294
(f::RedirectStdStream)() = f(Pipe())
×
1295

1296
# Deprecate these in v2 (RedirectStdStream support)
1297
iterate(p::Pipe) = (p.out, 1)
×
1298
iterate(p::Pipe, i::Int) = i == 1 ? (p.in, 2) : nothing
×
1299
getindex(p::Pipe, key::Int) = key == 1 ? p.out : key == 2 ? p.in : throw(KeyError(key))
×
1300

1301
"""
1302
    redirect_stdout([stream]) -> stream
1303

1304
Create a pipe to which all C and Julia level [`stdout`](@ref) output
1305
will be redirected. Return a stream representing the pipe ends.
1306
Data written to [`stdout`](@ref) may now be read from the `rd` end of
1307
the pipe.
1308

1309
!!! note
1310
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1311
    [`Pipe`](@ref), socket, or `devnull`.
1312

1313
See also [`redirect_stdio`](@ref).
1314
"""
1315
redirect_stdout
1316

1317
"""
1318
    redirect_stderr([stream]) -> stream
1319

1320
Like [`redirect_stdout`](@ref), but for [`stderr`](@ref).
1321

1322
!!! note
1323
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1324
    [`Pipe`](@ref), socket, or `devnull`.
1325

1326
See also [`redirect_stdio`](@ref).
1327
"""
1328
redirect_stderr
1329

1330
"""
1331
    redirect_stdin([stream]) -> stream
1332

1333
Like [`redirect_stdout`](@ref), but for [`stdin`](@ref).
1334
Note that the direction of the stream is reversed.
1335

1336
!!! note
1337
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1338
    [`Pipe`](@ref), socket, or `devnull`.
1339

1340
See also [`redirect_stdio`](@ref).
1341
"""
1342
redirect_stdin
1343

1344
"""
1345
    redirect_stdio(;stdin=stdin, stderr=stderr, stdout=stdout)
1346

1347
Redirect a subset of the streams `stdin`, `stderr`, `stdout`.
1348
Each argument must be an `IOStream`, `TTY`, [`Pipe`](@ref), socket, or
1349
`devnull`.
1350

1351
!!! compat "Julia 1.7"
1352
    `redirect_stdio` requires Julia 1.7 or later.
1353
"""
1354
function redirect_stdio(;stdin=nothing, stderr=nothing, stdout=nothing)
×
1355
    stdin  === nothing || redirect_stdin(stdin)
×
1356
    stderr === nothing || redirect_stderr(stderr)
×
1357
    stdout === nothing || redirect_stdout(stdout)
×
1358
end
1359

1360
"""
1361
    redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing)
1362

1363
Redirect a subset of the streams `stdin`, `stderr`, `stdout`,
1364
call `f()` and restore each stream.
1365

1366
Possible values for each stream are:
1367
* `nothing` indicating the stream should not be redirected.
1368
* `path::AbstractString` redirecting the stream to the file at `path`.
1369
* `io` an `IOStream`, `TTY`, [`Pipe`](@ref), socket, or `devnull`.
1370

1371
# Examples
1372
```julia-repl
1373
julia> redirect_stdio(stdout="stdout.txt", stderr="stderr.txt") do
1374
           print("hello stdout")
1375
           print(stderr, "hello stderr")
1376
       end
1377

1378
julia> read("stdout.txt", String)
1379
"hello stdout"
1380

1381
julia> read("stderr.txt", String)
1382
"hello stderr"
1383
```
1384

1385
# Edge cases
1386

1387
It is possible to pass the same argument to `stdout` and `stderr`:
1388
```julia-repl
1389
julia> redirect_stdio(stdout="log.txt", stderr="log.txt", stdin=devnull) do
1390
    ...
1391
end
1392
```
1393

1394
However it is not supported to pass two distinct descriptors of the same file.
1395
```julia-repl
1396
julia> io1 = open("same/path", "w")
1397

1398
julia> io2 = open("same/path", "w")
1399

1400
julia> redirect_stdio(f, stdout=io1, stderr=io2) # not supported
1401
```
1402
Also the `stdin` argument may not be the same descriptor as `stdout` or `stderr`.
1403
```julia-repl
1404
julia> io = open(...)
1405

1406
julia> redirect_stdio(f, stdout=io, stdin=io) # not supported
1407
```
1408

1409
!!! compat "Julia 1.7"
1410
    `redirect_stdio` requires Julia 1.7 or later.
1411
"""
1412
function redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing)
×
1413

1414
    function resolve(new::Nothing, oldstream, mode)
×
1415
        (new=nothing, close=false, old=nothing)
×
1416
    end
1417
    function resolve(path::AbstractString, oldstream,mode)
×
1418
        (new=open(path, mode), close=true, old=oldstream)
×
1419
    end
1420
    function resolve(new, oldstream, mode)
×
1421
        (new=new, close=false, old=oldstream)
×
1422
    end
1423

1424
    same_path(x, y) = false
×
1425
    function same_path(x::AbstractString, y::AbstractString)
×
1426
        # if x = y = "does_not_yet_exist.txt" then samefile will return false
1427
        (abspath(x) == abspath(y)) || samefile(x,y)
×
1428
    end
1429
    if same_path(stderr, stdin)
×
1430
        throw(ArgumentError("stdin and stderr cannot be the same path"))
×
1431
    end
1432
    if same_path(stdout, stdin)
×
1433
        throw(ArgumentError("stdin and stdout cannot be the same path"))
×
1434
    end
1435

1436
    new_in , close_in , old_in  = resolve(stdin , Base.stdin , "r")
×
1437
    new_out, close_out, old_out = resolve(stdout, Base.stdout, "w")
×
1438
    if same_path(stderr, stdout)
×
1439
        # make sure that in case stderr = stdout = "same/path"
1440
        # only a single io is used instead of opening the same file twice
1441
        new_err, close_err, old_err = new_out, false, Base.stderr
×
1442
    else
1443
        new_err, close_err, old_err = resolve(stderr, Base.stderr, "w")
×
1444
    end
1445

1446
    redirect_stdio(; stderr=new_err, stdin=new_in, stdout=new_out)
×
1447

1448
    try
×
1449
        return f()
×
1450
    finally
1451
        redirect_stdio(;stderr=old_err, stdin=old_in, stdout=old_out)
×
1452
        close_err && close(new_err)
×
1453
        close_in  && close(new_in )
×
1454
        close_out && close(new_out)
×
1455
    end
1456
end
1457

1458
function (f::RedirectStdStream)(thunk::Function, stream)
×
1459
    stdold = f.unix_fd == 0 ? stdin :
×
1460
             f.unix_fd == 1 ? stdout :
1461
             f.unix_fd == 2 ? stderr :
1462
             throw(ArgumentError("Not implemented to get old handle of fd except for stdio"))
1463
    f(stream)
×
1464
    try
×
1465
        return thunk()
×
1466
    finally
1467
        f(stdold)
×
1468
    end
1469
end
1470

1471

1472
"""
1473
    redirect_stdout(f::Function, stream)
1474

1475
Run the function `f` while redirecting [`stdout`](@ref) to `stream`.
1476
Upon completion, [`stdout`](@ref) is restored to its prior setting.
1477
"""
1478
redirect_stdout(f::Function, stream)
1479

1480
"""
1481
    redirect_stderr(f::Function, stream)
1482

1483
Run the function `f` while redirecting [`stderr`](@ref) to `stream`.
1484
Upon completion, [`stderr`](@ref) is restored to its prior setting.
1485
"""
1486
redirect_stderr(f::Function, stream)
1487

1488
"""
1489
    redirect_stdin(f::Function, stream)
1490

1491
Run the function `f` while redirecting [`stdin`](@ref) to `stream`.
1492
Upon completion, [`stdin`](@ref) is restored to its prior setting.
1493
"""
1494
redirect_stdin(f::Function, stream)
1495

1496
mark(x::LibuvStream)     = mark(x.buffer)
×
1497
unmark(x::LibuvStream)   = unmark(x.buffer)
×
1498
reset(x::LibuvStream)    = reset(x.buffer)
×
1499
ismarked(x::LibuvStream) = ismarked(x.buffer)
×
1500

1501
function peek(s::LibuvStream, ::Type{T}) where T
×
1502
    mark(s)
×
1503
    try read(s, T)
×
1504
    finally
1505
        reset(s)
×
1506
    end
1507
end
1508

1509
# BufferStream's are non-OS streams, backed by a regular IOBuffer
1510
mutable struct BufferStream <: LibuvStream
1511
    buffer::IOBuffer
1512
    cond::Threads.Condition
1513
    readerror::Any
1514
    buffer_writes::Bool
1515
    lock::ReentrantLock # advisory lock
1516
    status::Int
1517

1518
    BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, false, ReentrantLock(), StatusActive)
40✔
1519
end
1520

1521
isopen(s::BufferStream) = s.status != StatusClosed
×
1522

1523
closewrite(s::BufferStream) = close(s)
×
1524

1525
function close(s::BufferStream)
×
1526
    lock(s.cond) do
×
1527
        s.status = StatusClosed
×
1528
        notify(s.cond) # aka flush
×
1529
        nothing
×
1530
    end
1531
end
1532
uvfinalize(s::BufferStream) = nothing
×
1533
setup_stdio(stream::BufferStream, child_readable::Bool) = invoke(setup_stdio, Tuple{IO, Bool}, stream, child_readable)
×
1534

1535
function read(s::BufferStream, ::Type{UInt8})
×
1536
    nread = lock(s.cond) do
×
1537
        wait_readnb(s, 1)
×
1538
        read(s.buffer, UInt8)
×
1539
    end
1540
    return nread
×
1541
end
1542
function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt)
×
1543
    lock(s.cond) do
×
1544
        wait_readnb(s, Int(nb))
×
1545
        unsafe_read(s.buffer, a, nb)
×
1546
        nothing
×
1547
    end
1548
end
1549
bytesavailable(s::BufferStream) = bytesavailable(s.buffer)
×
1550

1551
isreadable(s::BufferStream) = (isopen(s) || bytesavailable(s) > 0) && s.buffer.readable
×
1552
iswritable(s::BufferStream) = isopen(s) && s.buffer.writable
×
1553

1554
function wait_readnb(s::BufferStream, nb::Int)
×
1555
    lock(s.cond) do
×
1556
        while isopen(s) && bytesavailable(s.buffer) < nb
×
1557
            wait(s.cond)
×
1558
        end
×
1559
    end
1560
end
1561

1562
function readavailable(this::BufferStream)
×
1563
    bytes = lock(this.cond) do
×
1564
        wait_readnb(this, 1)
×
1565
        buf = this.buffer
×
1566
        @assert buf.seekable == false
×
1567
        take!(buf)
×
1568
    end
1569
    return bytes
×
1570
end
1571

1572
function read(stream::BufferStream)
1573
    bytes = lock(stream.cond) do
40✔
1574
        wait_close(stream)
1575
        take!(stream.buffer)
1576
    end
1577
    return bytes
×
1578
end
1579

1580
function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int)
×
1581
    sbuf = s.buffer
×
1582
    @assert sbuf.seekable == false
×
1583
    @assert sbuf.maxsize >= nb
×
1584

1585
    function wait_locked(s, buf, nb)
×
1586
        while bytesavailable(buf) < nb
×
1587
            s.readerror === nothing || throw(s.readerror)
×
1588
            isopen(s) || break
×
1589
            s.status != StatusEOF || break
×
1590
            wait_readnb(s, nb)
×
1591
        end
×
1592
    end
1593

1594
    bytes = lock(s.cond) do
×
1595
        if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
×
1596
            wait_locked(s, sbuf, nb)
×
1597
        end
1598
        if bytesavailable(sbuf) >= nb
×
1599
            nread = readbytes!(sbuf, a, nb)
×
1600
        else
1601
            initsize = length(a)
×
1602
            newbuf = PipeBuffer(a, maxsize=nb)
×
1603
            newbuf.size = newbuf.offset # reset the write pointer to the beginning
×
1604
            nread = try
×
1605
                s.buffer = newbuf
×
1606
                write(newbuf, sbuf)
×
1607
                wait_locked(s, newbuf, nb)
×
1608
                bytesavailable(newbuf)
×
1609
            finally
1610
                s.buffer = sbuf
×
1611
            end
1612
            _take!(a, _unsafe_take!(newbuf))
×
1613
            length(a) >= initsize || resize!(a, initsize)
×
1614
        end
1615
        return nread
×
1616
    end
1617
    return bytes
×
1618
end
1619

1620
show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")
×
1621

1622
function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
×
1623
    bytes = lock(s.cond) do
×
1624
        while isopen(s) && !occursin(c, s.buffer)
×
1625
            wait(s.cond)
×
1626
        end
×
1627
        readuntil(s.buffer, c, keep=keep)
×
1628
    end
1629
    return bytes
×
1630
end
1631

1632
function wait_close(s::BufferStream)
×
1633
    lock(s.cond) do
×
1634
        while isopen(s)
×
1635
            wait(s.cond)
×
1636
        end
×
1637
    end
1638
end
1639

1640
start_reading(s::BufferStream) = Int32(0)
×
1641
stop_reading(s::BufferStream) = nothing
×
1642

1643
write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b))
×
1644
function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt)
1645
    nwrite = lock(s.cond) do
40✔
1646
        check_open(s)
1647
        rv = unsafe_write(s.buffer, p, nb)
1648
        s.buffer_writes || notify(s.cond)
1649
        rv
1650
    end
1651
    return nwrite
×
1652
end
1653

1654
function eof(s::BufferStream)
×
1655
    bytesavailable(s) > 0 && return false
×
1656
    iseof = lock(s.cond) do
×
1657
        wait_readnb(s, 1)
×
1658
        return !isopen(s) && bytesavailable(s) <= 0
×
1659
    end
1660
    return iseof
×
1661
end
1662

1663
# If buffer_writes is called, it will delay notifying waiters till a flush is called.
1664
buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s)
×
1665
function flush(s::BufferStream)
×
1666
    lock(s.cond) do
×
1667
        check_open(s)
×
1668
        notify(s.cond)
×
1669
        nothing
×
1670
    end
1671
end
1672

1673
skip(s::BufferStream, n) = skip(s.buffer, n)
×
1674

1675
function reseteof(s::BufferStream)
×
1676
    lock(s.cond) do
×
1677
        s.status = StatusOpen
×
1678
        nothing
×
1679
    end
1680
    nothing
×
1681
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc