• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / 1293

03 Oct 2025 01:05AM UTC coverage: 76.952% (-0.1%) from 77.068%
1293

push

buildkite

web-flow
Support superscript small q (#59544)

Co-authored-by: Steven G. Johnson <stevenj@alum.mit.edu>

61282 of 79637 relevant lines covered (76.95%)

21700458.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.01
/base/stream.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
import .Libc: RawFD, dup
4
if Sys.iswindows()
5
    import .Libc: WindowsRawSocket
6
    const OS_HANDLE = WindowsRawSocket
7
    const INVALID_OS_HANDLE = WindowsRawSocket(Ptr{Cvoid}(-1))
8
else
9
    const OS_HANDLE = RawFD
10
    const INVALID_OS_HANDLE = RawFD(-1)
11
end
12

13

14
## types ##
15
abstract type IOServer end
16
"""
17
    LibuvServer
18

19
An abstract type for IOServers handled by libuv.
20

21
If `server isa LibuvServer`, it must obey the following interface:
22

23
- `server.handle` must be a `Ptr{Cvoid}`
24
- `server.status` must be an `Int`
25
- `server.cond` must be a `GenericCondition`
26
"""
27
abstract type LibuvServer <: IOServer end
28

29
function getproperty(server::LibuvServer, name::Symbol)
1✔
30
    if name === :handle
33,575✔
31
        return getfield(server, :handle)::Ptr{Cvoid}
29,684✔
32
    elseif name === :status
25,767✔
33
        return getfield(server, :status)::Int
68,053✔
34
    elseif name === :cond
11,580✔
35
        return getfield(server, :cond)::GenericCondition
11,580✔
36
    else
37
        return getfield(server, name)
×
38
    end
39
end
40

41
"""
42
    LibuvStream
43

44
An abstract type for IO streams handled by libuv.
45

46
If `stream isa LibuvStream`, it must obey the following interface:
47

48
- `stream.handle`, if present, must be a `Ptr{Cvoid}`
49
- `stream.status`, if present, must be an `Int`
50
- `stream.buffer`, if present, must be an `IOBuffer`
51
- `stream.sendbuf`, if present, must be a `Union{Nothing,IOBuffer}`
52
- `stream.cond`, if present, must be a `GenericCondition`
53
- `stream.lock`, if present, must be an `AbstractLock`
54
- `stream.throttle`, if present, must be an `Int`
55
"""
56
abstract type LibuvStream <: IO end
57

58
function getproperty(stream::LibuvStream, name::Symbol)
328✔
59
    if name === :handle
46,310,437✔
60
        return getfield(stream, :handle)::Ptr{Cvoid}
3,001,782✔
61
    elseif name === :status
43,785,289✔
62
        return getfield(stream, :status)::Int
10,631,918✔
63
    elseif name === :buffer
34,349,650✔
64
        return getfield(stream, :buffer)::IOBuffer
21,419,807✔
65
    elseif name === :sendbuf
15,563,713✔
66
        return getfield(stream, :sendbuf)::Union{Nothing,IOBuffer}
8,681,329✔
67
    elseif name === :cond
6,882,384✔
68
        return getfield(stream, :cond)::GenericCondition
4,576,942✔
69
    elseif name === :lock
2,980,587✔
70
        return getfield(stream, :lock)::AbstractLock
620,933✔
71
    elseif name === :throttle
2,359,984✔
72
        return getfield(stream, :throttle)::Int
1,399,639✔
73
    else
74
        return getfield(stream, name)
961,474✔
75
    end
76
end
77

78
# IO
79
# +- GenericIOBuffer{T<:AbstractVector{UInt8}} (not exported)
80
# +- AbstractPipe (not exported)
81
# .  +- Pipe
82
# .  +- Process (not exported)
83
# .  +- ProcessChain (not exported)
84
# +- DevNull (not exported)
85
# +- Filesystem.File
86
# +- LibuvStream (not exported)
87
# .  +- PipeEndpoint (not exported)
88
# .  +- TCPSocket
89
# .  +- TTY (not exported)
90
# .  +- UDPSocket
91
# .  +- BufferStream (FIXME: 2.0)
92
# +- IOBuffer = Base.GenericIOBuffer{Vector{UInt8}}
93
# +- IOStream
94

95
# IOServer
96
# +- LibuvServer
97
# .  +- PipeServer
98
# .  +- TCPServer
99

100
# Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported)
101

102
bytesavailable(s::LibuvStream) = bytesavailable(s.buffer)
3,468,889✔
103

104
function eof(s::LibuvStream)
423✔
105
    bytesavailable(s) > 0 && return false
3,357,382✔
106
    wait_readnb(s, 1)
18,803✔
107
    # This function is race-y if used from multiple threads, but we guarantee
108
    # it to never return true until the stream is definitively exhausted
109
    # and that we won't return true if there's a readerror pending (it'll instead get thrown).
110
    # This requires some careful ordering here (TODO: atomic loads)
111
    bytesavailable(s) > 0 && return false
18,774✔
112
    open = isreadable(s) # must precede readerror check
5,170✔
113
    s.readerror === nothing || throw(s.readerror)
2,585✔
114
    return !open
2,585✔
115
end
116

117
# Limit our default maximum read and buffer size,
118
# to avoid DoS-ing ourself into an OOM situation
119
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
120

121
# manually limit our write size, if the OS doesn't support full-size writes
122
if Sys.iswindows()
123
    const MAX_OS_WRITE = UInt(0x1FF0_0000) # 511 MB (determined semi-empirically, limited to 31 MB on XP)
124
else
125
    const MAX_OS_WRITE = UInt(0x7FFF_0000) # almost 2 GB (both macOS and linux have this kernel restriction, although only macOS documents it)
126
end
127

128

129
const StatusUninit      = 0 # handle is allocated, but not initialized
130
const StatusInit        = 1 # handle is valid, but not connected/active
131
const StatusConnecting  = 2 # handle is in process of connecting
132
const StatusOpen        = 3 # handle is usable
133
const StatusActive      = 4 # handle is listening for read/write/connect events
134
const StatusClosing     = 5 # handle is closing / being closed
135
const StatusClosed      = 6 # handle is closed
136
const StatusEOF         = 7 # handle is a TTY that has seen an EOF event (pretends to be closed until reseteof is called)
137
const StatusPaused      = 8 # handle is Active, but not consuming events, and will transition to Open if it receives an event
138
function uv_status_string(x)
139
    s = x.status
28✔
140
    if x.handle == C_NULL
28✔
141
        if s == StatusClosed
×
142
            return "closed"
×
143
        elseif s == StatusUninit
×
144
            return "null"
×
145
        end
146
        return "invalid status"
×
147
    elseif s == StatusUninit
28✔
148
        return "uninit"
×
149
    elseif s == StatusInit
28✔
150
        return "init"
3✔
151
    elseif s == StatusConnecting
25✔
152
        return "connecting"
1✔
153
    elseif s == StatusOpen
24✔
154
        return "open"
21✔
155
    elseif s == StatusActive
3✔
156
        return "active"
3✔
157
    elseif s == StatusPaused
×
158
        return "paused"
×
159
    elseif s == StatusClosing
×
160
        return "closing"
×
161
    elseif s == StatusClosed
×
162
        return "closed"
×
163
    elseif s == StatusEOF
×
164
        return "eof"
×
165
    end
166
    return "invalid status"
×
167
end
168

169
mutable struct PipeEndpoint <: LibuvStream
170
    handle::Ptr{Cvoid}
171
    status::Int
172
    buffer::IOBuffer
173
    cond::ThreadSynchronizer
174
    readerror::Any
175
    sendbuf::Union{IOBuffer, Nothing}
176
    lock::ReentrantLock # advisory lock
177
    throttle::Int
178
    function PipeEndpoint(handle::Ptr{Cvoid}, status)
1✔
179
        p = new(handle,
2,235✔
180
                status,
181
                PipeBuffer(),
182
                ThreadSynchronizer(),
183
                nothing,
184
                nothing,
185
                ReentrantLock(),
186
                DEFAULT_READ_BUFFER_SZ)
187
        associate_julia_struct(handle, p)
2,235✔
188
        finalizer(uvfinalize, p)
2,235✔
189
        return p
2,235✔
190
    end
191
end
192

193
function PipeEndpoint()
2,235✔
194
    pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit)
2,235✔
195
    iolock_begin()
2,235✔
196
    err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0)
2,235✔
197
    uv_error("failed to create pipe endpoint", err)
2,235✔
198
    pipe.status = StatusInit
2,235✔
199
    iolock_end()
2,235✔
200
    return pipe
2,235✔
201
end
202

203
function PipeEndpoint(fd::OS_HANDLE)
2✔
204
    pipe = PipeEndpoint()
10✔
205
    return open_pipe!(pipe, fd)
10✔
206
end
207
if OS_HANDLE != RawFD
208
    PipeEndpoint(fd::RawFD) = PipeEndpoint(Libc._get_osfhandle(fd))
1✔
209
end
210

211

212
mutable struct TTY <: LibuvStream
213
    handle::Ptr{Cvoid}
214
    status::Int
215
    buffer::IOBuffer
216
    cond::ThreadSynchronizer
217
    readerror::Any
218
    sendbuf::Union{IOBuffer, Nothing}
219
    lock::ReentrantLock # advisory lock
220
    throttle::Int
221
    @static if Sys.iswindows(); ispty::Bool; end
222
    function TTY(handle::Ptr{Cvoid}, status)
223
        tty = new(
1✔
224
            handle,
225
            status,
226
            PipeBuffer(),
227
            ThreadSynchronizer(),
228
            nothing,
229
            nothing,
230
            ReentrantLock(),
231
            DEFAULT_READ_BUFFER_SZ)
232
        associate_julia_struct(handle, tty)
1✔
233
        finalizer(uvfinalize, tty)
1✔
234
        @static if Sys.iswindows()
235
            tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Cvoid},), handle) != 0
1✔
236
        end
237
        return tty
×
238
    end
239
end
240

241
function TTY(fd::OS_HANDLE)
×
242
    tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit)
×
243
    iolock_begin()
×
244
    err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32),
×
245
        eventloop(), tty.handle, fd, 0)
246
    uv_error("TTY", err)
×
247
    tty.status = StatusOpen
×
248
    iolock_end()
×
249
    return tty
×
250
end
251
if OS_HANDLE != RawFD
252
    TTY(fd::RawFD) = TTY(Libc._get_osfhandle(fd))
×
253
end
254

255
show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(",
×
256
    _fd(stream), " ",
257
    uv_status_string(stream), ")")
258
show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(",
19✔
259
    _fd(stream), " ",
260
    uv_status_string(stream), ", ",
261
    bytesavailable(stream.buffer), " bytes waiting)")
262

263
# Shared LibuvStream object interface
264

265
function isreadable(io::LibuvStream)
12✔
266
    bytesavailable(io) > 0 && return true
92,715✔
267
    isopen(io) || return false
93,425✔
268
    io.status == StatusEOF && return false
89,563✔
269
    return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0
88,901✔
270
end
271

272
function iswritable(io::LibuvStream)
18✔
273
    isopen(io) || return false
2,230✔
274
    io.status == StatusClosing && return false
26✔
275
    return ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), io.handle) != 0
26✔
276
end
277

278
lock(s::LibuvStream) = lock(s.lock)
312,244✔
279
unlock(s::LibuvStream) = unlock(s.lock)
310,461✔
280

281
setup_stdio(stream::Union{LibuvStream, LibuvServer}, ::Bool) = (stream, false)
1,692✔
282
rawhandle(stream::Union{LibuvStream, LibuvServer}) = stream.handle
2,574✔
283
unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle
919,694✔
284

285
function init_stdio(handle::Ptr{Cvoid})
×
286
    iolock_begin()
×
287
    t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle)
×
288
    local io
×
289
    if t == UV_FILE
×
290
        fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle)
×
291
        # TODO: Replace ios.c file with libuv fs?
292
        # return File(fd)
293
        @static if Sys.iswindows()
×
294
            # TODO: Get ios.c to understand native handles
295
            fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0)
×
296
        end
297
        # TODO: Get fdio to work natively with file descriptors instead of integers
298
        io = fdio(cconvert(Cint, fd))
×
299
    elseif t == UV_TTY
×
300
        io = TTY(handle, StatusOpen)
×
301
    elseif t == UV_TCP
×
302
        Sockets = require_stdlib(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets"))
×
303
        io = Sockets.TCPSocket(handle, StatusOpen)
×
304
    elseif t == UV_NAMED_PIPE
×
305
        io = PipeEndpoint(handle, StatusOpen)
×
306
    else
307
        throw(ArgumentError("invalid stdio type: $t"))
×
308
    end
309
    iolock_end()
×
310
    return io
×
311
end
312

313
"""
314
    open(fd::OS_HANDLE)::IO
315

316
Take a raw file descriptor and wrap it in a Julia-aware IO type,
317
and take ownership of the fd handle.
318
Call `open(Libc.dup(fd))` to avoid the ownership capture
319
of the original handle.
320

321
!!! warning
322
    Do not call this on a handle that's already owned by some
323
    other part of the system.
324
"""
325
function open(h::OS_HANDLE)
7✔
326
    iolock_begin()
7✔
327
    t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h)
7✔
328
    local io
329
    if t == UV_FILE
7✔
330
        @static if Sys.iswindows()
331
            # TODO: Get ios.c to understand native handles
332
            h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0)
333
        end
334
        # TODO: Get fdio to work natively with file descriptors instead of integers
335
        io = fdio(cconvert(Cint, h))
×
336
    elseif t == UV_TTY
7✔
337
        io = TTY(h)
×
338
    elseif t == UV_TCP
7✔
339
        Sockets = require_stdlib(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets"))
×
340
        io = Sockets.TCPSocket(h)
×
341
    elseif t == UV_NAMED_PIPE
7✔
342
        io = PipeEndpoint(h)
7✔
343
        @static if Sys.iswindows()
344
            if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), io.handle) != 0
3✔
345
                # replace the Julia `PipeEndpoint` type with a `TTY` type,
346
                # if we detect that this is a cygwin pty object
347
                pipe_handle, pipe_status = io.handle, io.status
1✔
348
                io.status = StatusClosed
1✔
349
                io.handle = C_NULL
1✔
350
                io = TTY(pipe_handle, pipe_status)
1✔
351
            end
352
        end
353
    else
354
        throw(ArgumentError("invalid stdio type: $t"))
×
355
    end
356
    iolock_end()
7✔
357
    return io
7✔
358
end
359

360
if OS_HANDLE != RawFD
361
    function open(fd::RawFD)
2✔
362
        h = Libc.dup(Libc._get_osfhandle(fd)) # make a dup to steal ownership away from msvcrt
2✔
363
        try
2✔
364
            io = open(h)
2✔
365
            ccall(:_close, Cint, (RawFD,), fd) # on success, destroy the old libc handle
2✔
366
            return io
2✔
367
        catch ex
368
            ccall(:CloseHandle, stdcall, Cint, (OS_HANDLE,), h) # on failure, destroy the new nt handle
×
369
            rethrow(ex)
×
370
        end
371
    end
372
end
373

374
function isopen(x::Union{LibuvStream, LibuvServer})
1,947✔
375
    if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
3,981,132✔
376
        throw(ArgumentError("stream not initialized"))
×
377
    end
378
    return x.status != StatusClosed
1,990,566✔
379
end
380

381
function check_open(x::Union{LibuvStream, LibuvServer})
30✔
382
    if !isopen(x) || x.status == StatusClosing
1,488,587✔
383
        throw(IOError("stream is closed or unusable", 0))
1✔
384
    end
385
end
386

387
function wait_readnb(x::LibuvStream, nb::Int)
117,703✔
388
    # fast path before iolock acquire
389
    bytesavailable(x.buffer) >= nb && return
117,703✔
390
    open = isopen(x) && x.status != StatusEOF # must precede readerror check
116,215✔
391
    x.readerror === nothing || throw(x.readerror)
116,216✔
392
    open || return
116,741✔
393
    iolock_begin()
115,687✔
394
    # repeat fast path after iolock acquire, before other expensive work
395
    bytesavailable(x.buffer) >= nb && (iolock_end(); return)
115,687✔
396
    open = isopen(x) && x.status != StatusEOF
115,687✔
397
    x.readerror === nothing || throw(x.readerror)
115,687✔
398
    open || (iolock_end(); return)
115,687✔
399
    # now do the "real" work
400
    oldthrottle = x.throttle
115,687✔
401
    preserve_handle(x)
115,687✔
402
    lock(x.cond)
115,687✔
403
    try
115,687✔
404
        while bytesavailable(x.buffer) < nb
236,844✔
405
            x.readerror === nothing || throw(x.readerror)
123,746✔
406
            isopen(x) || break
124,546✔
407
            x.status == StatusEOF && break
122,838✔
408
            x.throttle = max(nb, x.throttle)
121,932✔
409
            start_reading(x) # ensure we are reading
139,991✔
410
            iolock_end()
121,932✔
411
            wait(x.cond)
121,932✔
412
            unlock(x.cond)
121,157✔
413
            iolock_begin()
121,157✔
414
            lock(x.cond)
121,157✔
415
        end
236,042✔
416
    finally
417
        if isempty(x.cond)
114,914✔
418
            stop_reading(x) # stop reading iff there are currently no other read clients of the stream
114,914✔
419
        end
420
        if oldthrottle <= x.throttle <= nb
114,914✔
421
            # if we're interleaving readers, we might not get back to the "original" throttle
422
            # but we consider that an acceptable "risk", since we can't be quite sure what the intended value is now
423
            x.throttle = oldthrottle
208✔
424
        end
425
        unpreserve_handle(x)
114,914✔
426
        unlock(x.cond)
114,914✔
427
    end
428
    iolock_end()
114,885✔
429
    nothing
114,885✔
430
end
431

432
function closewrite(s::LibuvStream)
1,101✔
433
    iolock_begin()
1,101✔
434
    if !iswritable(s)
1,109✔
435
        iolock_end()
1,093✔
436
        return
1,093✔
437
    end
438
    req = Libc.malloc(_sizeof_uv_shutdown)
8✔
439
    uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
8✔
440
    err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
8✔
441
                req, s, @cfunction(uv_shutdowncb_task, Cvoid, (Ptr{Cvoid}, Cint)))
442
    if err < 0
8✔
443
        Libc.free(req)
×
444
        uv_error("shutdown", err)
×
445
    end
446
    ct = current_task()
8✔
447
    preserve_handle(ct)
8✔
448
    sigatomic_begin()
8✔
449
    uv_req_set_data(req, ct)
8✔
450
    iolock_end()
8✔
451
    local status
452
    try
8✔
453
        sigatomic_end()
8✔
454
        status = wait()::Cint
8✔
455
        sigatomic_begin()
8✔
456
    finally
457
        # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
458
        sigatomic_end()
8✔
459
        iolock_begin()
8✔
460
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
8✔
461
        if uv_req_data(req) != C_NULL
8✔
462
            # req is still alive,
463
            # so make sure we won't get spurious notifications later
464
            uv_req_set_data(req, C_NULL)
×
465
        else
466
            # done with req
467
            Libc.free(req)
8✔
468
        end
469
        iolock_end()
8✔
470
        unpreserve_handle(ct)
8✔
471
    end
472
    if isopen(s)
8✔
473
        if status < 0 || ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), s.handle) == 0
16✔
474
            close(s)
×
475
        end
476
    end
477
    if status < 0
8✔
478
        throw(_UVError("shutdown", status))
×
479
    end
480
    nothing
8✔
481
end
482

483
function wait_close(x::Union{LibuvStream, LibuvServer})
9,713✔
484
    preserve_handle(x)
9,713✔
485
    lock(x.cond)
9,713✔
486
    try
9,713✔
487
        while isopen(x)
18,724✔
488
            wait(x.cond)
9,062✔
489
        end
9,011✔
490
    finally
491
        unlock(x.cond)
9,662✔
492
        unpreserve_handle(x)
9,662✔
493
    end
494
    nothing
9,662✔
495
end
496

497
function close(stream::Union{LibuvStream, LibuvServer})
9,710✔
498
    iolock_begin()
9,710✔
499
    if stream.status == StatusInit
9,710✔
500
        ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
6✔
501
        stream.status = StatusClosing
6✔
502
    elseif isopen(stream)
9,704✔
503
        if stream.status != StatusClosing
9,053✔
504
            ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
9,053✔
505
            stream.status = StatusClosing
9,053✔
506
        end
507
    end
508
    iolock_end()
9,710✔
509
    wait_close(stream)
9,710✔
510
    nothing
9,659✔
511
end
512

513
function uvfinalize(uv::Union{LibuvStream, LibuvServer})
2,419✔
514
    iolock_begin()
2,419✔
515
    if uv.handle != C_NULL
2,419✔
516
        disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
2,419✔
517
        if uv.status == StatusUninit
2,419✔
518
            Libc.free(uv.handle)
×
519
        elseif uv.status == StatusInit
2,419✔
520
            ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
2✔
521
        elseif isopen(uv)
2,417✔
522
            if uv.status != StatusClosing
516✔
523
                ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
516✔
524
            end
525
        elseif uv.status == StatusClosed
1,901✔
526
            Libc.free(uv.handle)
1,901✔
527
        end
528
        uv.handle = C_NULL
2,419✔
529
        uv.status = StatusClosed
2,419✔
530
    end
531
    iolock_end()
2,419✔
532
    nothing
2,419✔
533
end
534

535
if Sys.iswindows()
536
    ispty(s::TTY) = s.ispty
1✔
537
    ispty(s::IO) = false
×
538
end
539

540
"""
541
    displaysize([io::IO]) -> (lines, columns)
542

543
Return the nominal size of the screen that may be used for rendering output to
544
this `IO` object.
545
If no input is provided, the environment variables `LINES` and `COLUMNS` are read.
546
If those are not set, a default size of `(24, 80)` is returned.
547

548
# Examples
549
```jldoctest
550
julia> withenv("LINES" => 30, "COLUMNS" => 100) do
551
           displaysize()
552
       end
553
(30, 100)
554
```
555

556
To get your TTY size,
557

558
```julia-repl
559
julia> displaysize(stdout)
560
(34, 147)
561
```
562
"""
563
displaysize(io::IO) = displaysize()
106,085✔
564
displaysize() = (parse(Int, get(ENV, "LINES",   "24")),
90,272✔
565
                 parse(Int, get(ENV, "COLUMNS", "80")))::Tuple{Int, Int}
566

567
# This is a fancy way to make de-specialize a call to `displaysize(io::IO)`
568
# which is unfortunately invalidated by REPL
569
#  (https://github.com/JuliaLang/julia/issues/56080)
570
#
571
# This makes the call less efficient, but avoids being invalidated by REPL.
572
displaysize_(io::IO) = Base.invoke_in_world(Base.tls_world_age(), displaysize, io)::Tuple{Int,Int}
90,215✔
573

574
function displaysize(io::TTY)
60✔
575
    check_open(io)
60✔
576

577
    local h::Int, w::Int
578
    default_size = displaysize()
59✔
579

580
    @static if Sys.iswindows()
581
        if ispty(io)
1✔
582
            # io is actually a libuv pipe but a cygwin/msys2 pty
583
            try
1✔
584
                h, w = parse.(Int, split(read(open(Base.Cmd(String["stty", "size"]), "r", io).out, String)))
1✔
585
                h > 0 || (h = default_size[1])
586
                w > 0 || (w = default_size[2])
587
                return h, w
588
            catch
589
                return default_size
1✔
590
            end
591
        end
592
    end
593

594
    s1 = Ref{Int32}(0)
58✔
595
    s2 = Ref{Int32}(0)
58✔
596
    iolock_begin()
58✔
597
    check_open(io)
58✔
598
    Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize,
58✔
599
                                      Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}),
600
                                      io, s1, s2) != 0)
601
    iolock_end()
58✔
602
    w, h = s1[], s2[]
58✔
603
    h > 0 || (h = default_size[1])
58✔
604
    w > 0 || (w = default_size[2])
58✔
605
    return h, w
58✔
606
end
607

608
### Libuv callbacks ###
609

610
## BUFFER ##
611
## Allocate space in buffer (for immediate use)
612
function alloc_request(buffer::IOBuffer, recommended_size::UInt)
1✔
613
    ensureroom(buffer, recommended_size)
978,251✔
614
    ptr = buffer.append ? buffer.size + 1 : buffer.ptr
489,126✔
615
    nb = min(length(buffer.data), buffer.maxsize + get_offset(buffer)) - ptr + 1
489,126✔
616
    return (Ptr{Cvoid}(pointer(buffer.data, ptr)), nb)
489,126✔
617
end
618

619
notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Cvoid}, len::UInt) = notify_filled(buffer, nread)
×
620

621
function notify_filled(buffer::IOBuffer, nread::Int)
1✔
622
    if buffer.append
557,980✔
623
        buffer.size += nread
557,980✔
624
    else
625
        buffer.ptr += nread
×
626
        buffer.size = max(buffer.size, buffer.ptr - 1)
×
627
    end
628
    nothing
488,002✔
629
end
630

631
function alloc_buf_hook(stream::LibuvStream, size::UInt)
1✔
632
    throttle = UInt(stream.throttle)
489,126✔
633
    return alloc_request(stream.buffer, (size > throttle) ? throttle : size)
489,126✔
634
end
635

636
function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid})
489,165✔
637
    hd = uv_handle_data(handle)
489,165✔
638
    if hd == C_NULL
489,165✔
639
        ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, 0)
×
640
        return nothing
×
641
    end
642
    stream = unsafe_pointer_to_objref(hd)::LibuvStream
489,165✔
643

644
    local data::Ptr{Cvoid}, newsize::Csize_t
645
    if stream.status != StatusActive
489,165✔
646
        data = C_NULL
39✔
647
        newsize = 0
39✔
648
    else
649
        (data, newsize) = alloc_buf_hook(stream, UInt(size))
978,252✔
650
        if data == C_NULL
489,126✔
651
            newsize = 0
×
652
        end
653
        # avoid aliasing of `nread` with `errno` in uv_readcb
654
        # or exceeding the Win32 maximum uv_buf_t len
655
        maxsize = @static Sys.iswindows() ? typemax(Cint) : typemax(Cssize_t)
489,126✔
656
        newsize > maxsize && (newsize = maxsize)
489,126✔
657
    end
658

659
    ccall(:jl_uv_buf_set_base, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), buf, data)
489,165✔
660
    ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, newsize)
489,165✔
661
    nothing
489,165✔
662
end
663

664
function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
491,753✔
665
    stream_unknown_type = @handle_as handle LibuvStream
491,753✔
666
    nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
491,753✔
667
    function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
1,053,197✔
668
        lock(stream.cond)
561,444✔
669
        if nread < 0
561,444✔
670
            if nread == UV_ENOBUFS && nrequested == 0
3,464✔
671
                # remind the client that stream.buffer is full
672
                notify(stream.cond)
344✔
673
            elseif nread == UV_EOF # libuv called uv_stop_reading already
3,120✔
674
                if stream.status != StatusClosing
3,093✔
675
                    stream.status = StatusEOF
3,093✔
676
                    notify(stream.cond)
3,093✔
677
                    if stream isa TTY
3,093✔
678
                        # stream can still be used by reseteof (or possibly write)
679
                    elseif !(stream isa PipeEndpoint) && ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0
3,034✔
680
                        # stream can still be used by write
681
                    else
682
                        # underlying stream is no longer useful: begin finalization
683
                        ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
917✔
684
                        stream.status = StatusClosing
917✔
685
                    end
686
                end
687
            else
688
                stream.readerror = _UVError("read", nread)
27✔
689
                notify(stream.cond)
27✔
690
                # This is a fatal connection error
691
                ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
27✔
692
                stream.status = StatusClosing
27✔
693
            end
694
        else
695
            notify_filled(stream.buffer, nread)
557,980✔
696
            notify(stream.cond)
557,980✔
697
        end
698
        unlock(stream.cond)
561,444✔
699

700
        # Stop background reading when
701
        # 1) there's nobody paying attention to the data we are reading
702
        # 2) we have accumulated a lot of unread data OR
703
        # 3) we have an alternate buffer that has reached its limit.
704
        if stream.status == StatusPaused ||
1,122,544✔
705
           (stream.status == StatusActive &&
706
            ((bytesavailable(stream.buffer) >= stream.throttle) ||
707
             (bytesavailable(stream.buffer) >= stream.buffer.maxsize)))
708
            # save cycles by stopping kernel notifications from arriving
709
            ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
734✔
710
            stream.status = StatusOpen
734✔
711
        end
712
        nothing
561,444✔
713
    end
714
    readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested))
491,753✔
715
    nothing
491,753✔
716
end
717

718
function reseteof(x::TTY)
6✔
719
    iolock_begin()
6✔
720
    if x.status == StatusEOF
6✔
721
        x.status = StatusOpen
×
722
    end
723
    iolock_end()
6✔
724
    nothing
6✔
725
end
726

727
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
2,177✔
728
    lock(uv.cond)
2,177✔
729
    try
2,177✔
730
        uv.status = StatusClosed
2,177✔
731
        # notify any listeners that exist on this libuv stream type
732
        notify(uv.cond)
2,177✔
733
    finally
734
        unlock(uv.cond)
2,177✔
735
    end
736
    nothing
2,177✔
737
end
738

739

740
##########################################
741
# Pipe Abstraction
742
#  (composed of two half-pipes: .in and .out)
743
##########################################
744

745
mutable struct Pipe <: AbstractPipe
746
    in::PipeEndpoint # writable
947✔
747
    out::PipeEndpoint # readable
748
end
749

750
"""
751
    Pipe()
752

753
Construct an uninitialized Pipe object, especially for IO communication between multiple processes.
754

755
The appropriate end of the pipe will be automatically initialized if the object is used in
756
process spawning. This can be useful to easily obtain references in process pipelines, e.g.:
757

758
```
759
julia> err = Pipe()
760

761
# After this `err` will be initialized and you may read `foo`'s
762
# stderr from the `err` pipe, or pass `err` to other pipelines.
763
julia> run(pipeline(pipeline(`foo`, stderr=err), `cat`), wait=false)
764

765
# Now destroy the write half of the pipe, so that the read half will get EOF
766
julia> closewrite(err)
767

768
julia> read(err, String)
769
"stderr messages"
770
```
771

772
See also [`Base.link_pipe!`](@ref).
773
"""
774
Pipe() = Pipe(PipeEndpoint(), PipeEndpoint())
952✔
775
pipe_reader(p::Pipe) = p.out
827✔
776
pipe_writer(p::Pipe) = p.in
196,818✔
777

778
"""
779
    link_pipe!(pipe; reader_supports_async=false, writer_supports_async=false)
780

781
Initialize `pipe` and link the `in` endpoint to the `out` endpoint. The keyword
782
arguments `reader_supports_async`/`writer_supports_async` correspond to
783
`OVERLAPPED` on Windows and `O_NONBLOCK` on POSIX systems. They should be `true`
784
unless they'll be used by an external program (e.g. the output of a command
785
executed with [`run`](@ref)).
786
"""
787
function link_pipe!(pipe::Pipe;
964✔
788
                    reader_supports_async = false,
789
                    writer_supports_async = false)
790
     link_pipe!(pipe.out, reader_supports_async, pipe.in, writer_supports_async)
964✔
791
     return pipe
948✔
792
end
793

794
show(io::IO, stream::Pipe) = print(io,
3✔
795
    "Pipe(",
796
    _fd(stream.in), " ",
797
    uv_status_string(stream.in), " => ",
798
    _fd(stream.out), " ",
799
    uv_status_string(stream.out), ", ",
800
    bytesavailable(stream), " bytes waiting)")
801

802
closewrite(pipe::Pipe) = close(pipe.in)
×
803

804
## Functions for PipeEndpoint and PipeServer ##
805

806
function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE)
1✔
807
    iolock_begin()
2,234✔
808
    if p.status != StatusInit
2,234✔
809
        error("pipe is already in use or has been closed")
×
810
    end
811
    err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle)
2,234✔
812
    uv_error("pipe_open", err)
2,234✔
813
    p.status = StatusOpen
2,233✔
814
    iolock_end()
2,233✔
815
    return p
2,233✔
816
end
817

818

819
function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool,
299✔
820
                    write_end::PipeEndpoint, writer_supports_async::Bool)
821
    rd, wr = link_pipe(reader_supports_async, writer_supports_async)
299✔
822
    try
298✔
823
        try
298✔
824
            open_pipe!(read_end, rd)
298✔
825
        catch
826
            close_pipe_sync(rd)
1✔
827
            rethrow()
1✔
828
        end
829
        open_pipe!(write_end, wr)
297✔
830
    catch
831
        close_pipe_sync(wr)
1✔
832
        rethrow()
1✔
833
    end
834
    nothing
297✔
835
end
836

837
function link_pipe(reader_supports_async::Bool, writer_supports_async::Bool)
1✔
838
    UV_NONBLOCK_PIPE = 0x40
1,930✔
839
    fildes = Ref{Pair{OS_HANDLE, OS_HANDLE}}(INVALID_OS_HANDLE => INVALID_OS_HANDLE) # read (in) => write (out)
1,930✔
840
    err = ccall(:uv_pipe, Int32, (Ptr{Pair{OS_HANDLE, OS_HANDLE}}, Cint, Cint),
1,930✔
841
                fildes,
842
                reader_supports_async * UV_NONBLOCK_PIPE,
843
                writer_supports_async * UV_NONBLOCK_PIPE)
844
    uv_error("pipe", err)
1,930✔
845
    return fildes[]
1,929✔
846
end
847

848
if Sys.iswindows()
849
    function close_pipe_sync(handle::WindowsRawSocket)
850
        ccall(:CloseHandle, stdcall, Cint, (WindowsRawSocket,), handle)
443✔
851
        nothing
414✔
852
    end
853
else
854
    function close_pipe_sync(handle::RawFD)
1✔
855
        ccall(:close, Cint, (RawFD,), handle)
1,314✔
856
        nothing
1,218✔
857
    end
858
end
859

860
## Functions for any LibuvStream ##
861

862
# flow control
863

864
function start_reading(stream::LibuvStream)
69,162✔
865
    iolock_begin()
453,249✔
866
    if stream.status == StatusOpen
453,249✔
867
        if !isreadable(stream)
179,002✔
868
            error("tried to read a stream that is not readable")
×
869
        end
870
        # libuv may call the alloc callback immediately
871
        # for a TTY on Windows, so ensure the status is set first
872
        stream.status = StatusActive
90,110✔
873
        ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
90,110✔
874
                    stream, @cfunction(uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
875
                    @cfunction(uv_readcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid})))
876
    elseif stream.status == StatusPaused
363,139✔
877
        stream.status = StatusActive
179,798✔
878
        ret = Int32(0)
179,798✔
879
    elseif stream.status == StatusActive
183,341✔
880
        ret = Int32(0)
183,341✔
881
    else
882
        ret = Int32(-1)
×
883
    end
884
    iolock_end()
453,249✔
885
    return ret
453,249✔
886
end
887

888
if Sys.iswindows()
889
    # the low performance version of stop_reading is required
890
    # on Windows due to a NT kernel bug that we can't use a blocking
891
    # stream for non-blocking (overlapped) calls,
892
    # and a ReadFile call blocking on one thread
893
    # causes all other operations on that stream to lockup
894
    function stop_reading(stream::LibuvStream)
895
        iolock_begin()
86,513✔
896
        if stream.status == StatusActive
86,513✔
897
            stream.status = StatusOpen
85,376✔
898
            ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
85,376✔
899
        end
900
        iolock_end()
86,513✔
901
        nothing
86,475✔
902
    end
903
else
904
    function stop_reading(stream::LibuvStream)
905
        iolock_begin()
182,736✔
906
        if stream.status == StatusActive
182,736✔
907
            stream.status = StatusPaused
180,336✔
908
        end
909
        iolock_end()
182,736✔
910
        nothing
182,660✔
911
    end
912
end
913

914
# bulk read / write
915

916
readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb))
420✔
917
function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
563,291✔
918
    iolock_begin()
563,291✔
919
    sbuf = s.buffer
563,291✔
920
    @assert sbuf.seekable == false
563,291✔
921
    @assert sbuf.maxsize >= nb
563,291✔
922

923
    function wait_locked(s, buf, nb)
1,126,583✔
924
        while bytesavailable(buf) < nb
570,384✔
925
            s.readerror === nothing || throw(s.readerror)
7,608✔
926
            isopen(s) || break
8,046✔
927
            s.status != StatusEOF || break
7,248✔
928
            iolock_end()
7,092✔
929
            wait_readnb(s, nb)
7,092✔
930
            iolock_begin()
7,092✔
931
        end
7,092✔
932
    end
933

934
    if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
563,291✔
935
        wait_locked(s, sbuf, nb)
562,412✔
936
    end
937
    if bytesavailable(sbuf) >= nb
563,291✔
938
        nread = readbytes!(sbuf, a, nb)
562,411✔
939
    else
940
        initsize = length(a)
880✔
941
        newbuf = _truncated_pipebuffer(a; maxsize=nb)
880✔
942
        nread = try
880✔
943
            s.buffer = newbuf
880✔
944
            write(newbuf, sbuf)
880✔
945
            wait_locked(s, newbuf, nb)
880✔
946
            bytesavailable(newbuf)
880✔
947
        finally
948
            s.buffer = sbuf
880✔
949
        end
950
        _take!(a, _unsafe_take!(newbuf))
880✔
951
        length(a) >= initsize || resize!(a, initsize)
880✔
952
    end
953
    iolock_end()
563,291✔
954
    return nread
563,291✔
955
end
956

957
function read(stream::LibuvStream)
1,234✔
958
    wait_readnb(stream, typemax(Int))
1,323✔
959
    iolock_begin()
1,322✔
960
    bytes = take!(stream.buffer)
1,322✔
961
    iolock_end()
1,322✔
962
    return bytes
1,298✔
963
end
964

965
function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
2,329,756✔
966
    iolock_begin()
2,329,756✔
967
    sbuf = s.buffer
2,329,756✔
968
    @assert sbuf.seekable == false
2,329,756✔
969
    @assert sbuf.maxsize >= nb
2,329,756✔
970

971
    function wait_locked(s, buf, nb)
4,659,249✔
972
        while bytesavailable(buf) < nb
2,437,431✔
973
            s.readerror === nothing || throw(s.readerror)
109,334✔
974
            isopen(s) || throw(EOFError())
109,376✔
975
            s.status != StatusEOF || throw(EOFError())
109,872✔
976
            iolock_end()
108,712✔
977
            wait_readnb(s, nb)
108,712✔
978
            iolock_begin()
107,938✔
979
        end
107,938✔
980
    end
981

982
    if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
2,329,756✔
983
        wait_locked(s, sbuf, Int(nb))
2,329,082✔
984
    end
985
    if bytesavailable(sbuf) >= nb
2,328,378✔
986
        unsafe_read(sbuf, p, nb)
2,327,967✔
987
    else
988
        newbuf = _truncated_pipebuffer(unsafe_wrap(Array, p, nb); maxsize=Int(nb))
411✔
989
        try
411✔
990
            s.buffer = newbuf
411✔
991
            write(newbuf, sbuf)
411✔
992
            wait_locked(s, newbuf, Int(nb))
411✔
993
        finally
994
            s.buffer = sbuf
411✔
995
        end
996
    end
997
    iolock_end()
2,328,360✔
998
    nothing
2,328,360✔
999
end
1000

1001
function read(this::LibuvStream, ::Type{UInt8})
10,527,317✔
1002
    iolock_begin()
10,527,317✔
1003
    sbuf = this.buffer
10,527,317✔
1004
    @assert sbuf.seekable == false
10,527,317✔
1005
    while bytesavailable(sbuf) < 1
10,527,757✔
1006
        iolock_end()
451✔
1007
        eof(this) && throw(EOFError())
902✔
1008
        iolock_begin()
440✔
1009
    end
440✔
1010
    c = read(sbuf, UInt8)
10,527,306✔
1011
    iolock_end()
10,527,306✔
1012
    return c
10,527,306✔
1013
end
1014

1015
function readavailable(this::LibuvStream)
2,060✔
1016
    wait_readnb(this, 1) # unlike the other `read` family of functions, this one doesn't guarantee error reporting
3,476✔
1017
    iolock_begin()
3,476✔
1018
    buf = this.buffer
3,476✔
1019
    @assert buf.seekable == false
3,476✔
1020
    bytes = take!(buf)
3,476✔
1021
    iolock_end()
3,476✔
1022
    return bytes
3,476✔
1023
end
1024

1025
function copyuntil(out::IO, x::LibuvStream, c::UInt8; keep::Bool=false)
317,656✔
1026
    iolock_begin()
158,134✔
1027
    buf = x.buffer
158,134✔
1028
    @assert buf.seekable == false
158,134✔
1029
    if !occursin(c, buf) # fast path checks first
161,869✔
1030
        x.readerror === nothing || throw(x.readerror)
154,399✔
1031
        if isopen(x) && x.status != StatusEOF
154,399✔
1032
            preserve_handle(x)
154,335✔
1033
            lock(x.cond)
154,335✔
1034
            try
154,335✔
1035
                while !occursin(c, x.buffer)
639,818✔
1036
                    x.readerror === nothing || throw(x.readerror)
331,754✔
1037
                    isopen(x) || break
331,813✔
1038
                    x.status != StatusEOF || break
331,939✔
1039
                    start_reading(x) # ensure we are reading
381,880✔
1040
                    iolock_end()
331,451✔
1041
                    wait(x.cond)
331,451✔
1042
                    unlock(x.cond)
331,451✔
1043
                    iolock_begin()
331,451✔
1044
                    lock(x.cond)
331,451✔
1045
                end
485,786✔
1046
            finally
1047
                if isempty(x.cond)
154,335✔
1048
                    stop_reading(x) # stop reading iff there are currently no other read clients of the stream
154,335✔
1049
                end
1050
                unlock(x.cond)
154,335✔
1051
                unpreserve_handle(x)
154,335✔
1052
            end
1053
        end
1054
    end
1055
    copyuntil(out, buf, c; keep)
158,134✔
1056
    iolock_end()
158,134✔
1057
    return out
158,134✔
1058
end
1059

1060
uv_write(s::LibuvStream, p::Vector{UInt8}) = GC.@preserve p uv_write(s, pointer(p), UInt(sizeof(p)))
116,552✔
1061

1062
# caller must have acquired the iolock
1063
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
731,082✔
1064
    uvw = uv_write_async(s, p, n)
731,082✔
1065
    ct = current_task()
731,082✔
1066
    preserve_handle(ct)
731,082✔
1067
    sigatomic_begin()
731,082✔
1068
    uv_req_set_data(uvw, ct)
731,082✔
1069
    iolock_end()
731,082✔
1070
    local status
1071
    try
731,082✔
1072
        sigatomic_end()
731,082✔
1073
        # wait for the last chunk to complete (or error)
1074
        # assume that any errors would be sticky,
1075
        # (so we don't need to monitor the error status of the intermediate writes)
1076
        status = wait()::Cint
731,082✔
1077
        sigatomic_begin()
731,075✔
1078
    finally
1079
        # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
1080
        sigatomic_end()
731,081✔
1081
        iolock_begin()
731,081✔
1082
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
731,081✔
1083
        if uv_req_data(uvw) != C_NULL
731,081✔
1084
            # uvw is still alive,
1085
            # so make sure we won't get spurious notifications later
1086
            uv_req_set_data(uvw, C_NULL)
2✔
1087
        else
1088
            # done with uvw
1089
            Libc.free(uvw)
731,079✔
1090
        end
1091
        iolock_end()
731,081✔
1092
        unpreserve_handle(ct)
731,081✔
1093
    end
1094
    if status < 0
731,075✔
1095
        throw(_UVError("write", status))
×
1096
    end
1097
    return Int(n)
731,075✔
1098
end
1099

1100
# helper function for uv_write that returns the uv_write_t struct for the write
1101
# rather than waiting on it, caller must hold the iolock
1102
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
731,359✔
1103
    check_open(s)
731,359✔
1104
    while true
731,359✔
1105
        uvw = Libc.malloc(_sizeof_uv_write)
731,359✔
1106
        uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
731,359✔
1107
        nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
731,359✔
1108
        # TODO: use writev instead of a loop
1109
        err = ccall(:jl_uv_write,
731,359✔
1110
                    Int32,
1111
                    (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}),
1112
                    s, p, nwrite, uvw,
1113
                    @cfunction(uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint)))
1114
        if err < 0
731,359✔
1115
            Libc.free(uvw)
×
1116
            uv_error("write", err)
×
1117
        end
1118
        n -= nwrite
731,359✔
1119
        p += nwrite
731,359✔
1120
        if n == 0
731,359✔
1121
            return uvw
731,359✔
1122
        end
1123
    end
×
1124
end
1125

1126

1127
# Optimized send
1128
# - smaller writes are buffered, final uv write on flush or when buffer full
1129
# - large isbits arrays are unbuffered and written directly
1130

1131
function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
2,750,499✔
1132
    while true
2,751,232✔
1133
        # try to add to the send buffer
1134
        iolock_begin()
2,751,232✔
1135
        buf = s.sendbuf
2,751,232✔
1136
        buf === nothing && break
2,751,232✔
1137
        totb = bytesavailable(buf) + n
2,137,381✔
1138
        if totb < buf.maxsize
2,137,381✔
1139
            nb = unsafe_write(buf, p, n)
2,136,011✔
1140
            iolock_end()
2,136,011✔
1141
            return nb
2,136,011✔
1142
        end
1143
        bytesavailable(buf) == 0 && break
1,370✔
1144
        # perform flush(s)
1145
        arr = take!(buf)
733✔
1146
        uv_write(s, arr)
733✔
1147
    end
733✔
1148
    # perform the output to the kernel
1149
    return uv_write(s, p, n)
614,488✔
1150
end
1151

1152
function flush(s::LibuvStream)
116,526✔
1153
    iolock_begin()
116,526✔
1154
    buf = s.sendbuf
116,526✔
1155
    if buf !== nothing
116,526✔
1156
        if bytesavailable(buf) > 0
115,819✔
1157
            arr = take!(buf)
115,819✔
1158
            uv_write(s, arr)
115,819✔
1159
            return
115,819✔
1160
        end
1161
    end
1162
    uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
707✔
1163
    return
707✔
1164
end
1165

1166
function buffer_writes(s::LibuvStream, bufsize)
863✔
1167
    sendbuf = PipeBuffer(bufsize)
1,333✔
1168
    iolock_begin()
1,333✔
1169
    s.sendbuf = sendbuf
1,333✔
1170
    iolock_end()
1,333✔
1171
    return s
1,333✔
1172
end
1173

1174
## low-level calls to libuv ##
1175

1176
function write(s::LibuvStream, b::UInt8)
5,813,571✔
1177
    buf = s.sendbuf
5,813,571✔
1178
    if buf !== nothing
5,813,571✔
1179
        iolock_begin()
5,797,953✔
1180
        if bytesavailable(buf) + 1 < buf.maxsize
5,797,953✔
1181
            n = write(buf, b)
11,595,810✔
1182
            iolock_end()
5,797,905✔
1183
            return n
5,797,905✔
1184
        end
1185
        iolock_end()
48✔
1186
    end
1187
    return write(s, Ref{UInt8}(b))
15,666✔
1188
end
1189

1190
function uv_writecb_task(req::Ptr{Cvoid}, status::Cint)
673,295✔
1191
    d = uv_req_data(req)
673,295✔
1192
    if d != C_NULL
673,295✔
1193
        uv_req_set_data(req, C_NULL) # let the Task know we got the writecb
673,294✔
1194
        t = unsafe_pointer_to_objref(d)::Task
673,294✔
1195
        schedule(t, status)
673,294✔
1196
    else
1197
        # no owner for this req, safe to just free it
1198
        Libc.free(req)
1✔
1199
    end
1200
    nothing
673,295✔
1201
end
1202

1203
function uv_shutdowncb_task(req::Ptr{Cvoid}, status::Cint)
×
1204
    d = uv_req_data(req)
×
1205
    if d != C_NULL
×
1206
        uv_req_set_data(req, C_NULL) # let the Task know we got the shutdowncb
×
1207
        t = unsafe_pointer_to_objref(d)::Task
×
1208
        schedule(t, status)
×
1209
    else
1210
        # no owner for this req, safe to just free it
1211
        Libc.free(req)
×
1212
    end
1213
    nothing
×
1214
end
1215

1216

1217
_fd(x::IOStream) = RawFD(fd(x))
471✔
1218
_fd(x::Union{OS_HANDLE, RawFD}) = x
×
1219

1220
function _fd(x::Union{LibuvStream, LibuvServer})
6✔
1221
    fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE)
154✔
1222
    if x.status != StatusUninit && x.status != StatusClosed && x.handle != C_NULL
154✔
1223
        err = ccall(:uv_fileno, Int32, (Ptr{Cvoid}, Ptr{OS_HANDLE}), x.handle, fd)
148✔
1224
        # handle errors by returning INVALID_OS_HANDLE
1225
    end
1226
    return fd[]
154✔
1227
end
1228

1229
struct RedirectStdStream <: Function
1230
    unix_fd::Int
1231
    writable::Bool
1232
end
1233
for (f, writable, unix_fd) in
1234
        ((:redirect_stdin, false, 0),
1235
         (:redirect_stdout, true, 1),
1236
         (:redirect_stderr, true, 2))
1237
    @eval const ($f) = RedirectStdStream($unix_fd, $writable)
1238
end
1239
function _redirect_io_libc(stream, unix_fd::Int)
1240
    posix_fd = _fd(stream)
576✔
1241
    @static if Sys.iswindows()
1242
        if 0 <= unix_fd <= 2
81✔
1243
            ccall(:SetStdHandle, stdcall, Int32, (Int32, OS_HANDLE),
81✔
1244
                -10 - unix_fd, Libc._get_osfhandle(posix_fd))
1245
        end
1246
    end
1247
    dup(posix_fd, RawFD(unix_fd))
576✔
1248
    nothing
539✔
1249
end
1250
function _redirect_io_global(io, unix_fd::Int)
1251
    unix_fd == 0 && (global stdin = io)
1,386✔
1252
    unix_fd == 1 && (global stdout = io)
1,386✔
1253
    unix_fd == 2 && (global stderr = io)
1,386✔
1254
    nothing
974✔
1255
end
1256
function (f::RedirectStdStream)(handle::Union{LibuvStream, IOStream})
140✔
1257
    _redirect_io_libc(handle, f.unix_fd)
201✔
1258
    c_sym = f.unix_fd == 0 ? cglobal(:jl_uv_stdin, Ptr{Cvoid}) :
359✔
1259
            f.unix_fd == 1 ? cglobal(:jl_uv_stdout, Ptr{Cvoid}) :
1260
            f.unix_fd == 2 ? cglobal(:jl_uv_stderr, Ptr{Cvoid}) :
1261
            C_NULL
1262
    c_sym == C_NULL || unsafe_store!(c_sym, handle.handle)
402✔
1263
    _redirect_io_global(handle, f.unix_fd)
201✔
1264
    return handle
201✔
1265
end
1266
function (f::RedirectStdStream)(::DevNull)
5✔
1267
    nulldev = @static Sys.iswindows() ? "NUL" : "/dev/null"
367✔
1268
    handle = open(nulldev, write=f.writable)
375✔
1269
    _redirect_io_libc(handle, f.unix_fd)
375✔
1270
    close(handle) # handle has been dup'ed in _redirect_io_libc
375✔
1271
    _redirect_io_global(devnull, f.unix_fd)
375✔
1272
    return devnull
375✔
1273
end
1274
function (f::RedirectStdStream)(io::AbstractPipe)
780✔
1275
    io2 = (f.writable ? pipe_writer : pipe_reader)(io)
1,590✔
1276
    f(io2)
811✔
1277
    _redirect_io_global(io, f.unix_fd)
810✔
1278
    return io
810✔
1279
end
1280
function (f::RedirectStdStream)(p::Pipe)
1281
    if p.in.status == StatusInit && p.out.status == StatusInit
29✔
1282
        link_pipe!(p)
29✔
1283
    end
1284
    io2 = getfield(p, f.writable ? :in : :out)
29✔
1285
    f(io2)
29✔
1286
    return p
29✔
1287
end
1288
(f::RedirectStdStream)() = f(Pipe())
29✔
1289

1290
# Deprecate these in v2 (RedirectStdStream support)
1291
iterate(p::Pipe) = (p.out, 1)
39✔
1292
iterate(p::Pipe, i::Int) = i == 1 ? (p.in, 2) : nothing
44✔
1293
getindex(p::Pipe, key::Int) = key == 1 ? p.out : key == 2 ? p.in : throw(KeyError(key))
×
1294

1295
"""
1296
    redirect_stdout([stream]) -> stream
1297

1298
Create a pipe to which all C and Julia level [`stdout`](@ref) output
1299
will be redirected. Return a stream representing the pipe ends.
1300
Data written to [`stdout`](@ref) may now be read from the `rd` end of
1301
the pipe.
1302

1303
!!! note
1304
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1305
    [`Pipe`](@ref), socket, or `devnull`.
1306

1307
See also [`redirect_stdio`](@ref).
1308
"""
1309
redirect_stdout
1310

1311
"""
1312
    redirect_stderr([stream]) -> stream
1313

1314
Like [`redirect_stdout`](@ref), but for [`stderr`](@ref).
1315

1316
!!! note
1317
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1318
    [`Pipe`](@ref), socket, or `devnull`.
1319

1320
See also [`redirect_stdio`](@ref).
1321
"""
1322
redirect_stderr
1323

1324
"""
1325
    redirect_stdin([stream]) -> stream
1326

1327
Like [`redirect_stdout`](@ref), but for [`stdin`](@ref).
1328
Note that the direction of the stream is reversed.
1329

1330
!!! note
1331
    `stream` must be a compatible objects, such as an `IOStream`, `TTY`,
1332
    [`Pipe`](@ref), socket, or `devnull`.
1333

1334
See also [`redirect_stdio`](@ref).
1335
"""
1336
redirect_stdin
1337

1338
"""
1339
    redirect_stdio(;stdin=stdin, stderr=stderr, stdout=stdout)
1340

1341
Redirect a subset of the streams `stdin`, `stderr`, `stdout`.
1342
Each argument must be an `IOStream`, `TTY`, [`Pipe`](@ref), socket, or
1343
`devnull`.
1344

1345
!!! compat "Julia 1.7"
1346
    `redirect_stdio` requires Julia 1.7 or later.
1347
"""
1348
function redirect_stdio(;stdin=nothing, stderr=nothing, stdout=nothing)
48✔
1349
    stdin  === nothing || redirect_stdin(stdin)
38✔
1350
    stderr === nothing || redirect_stderr(stderr)
44✔
1351
    stdout === nothing || redirect_stdout(stdout)
36✔
1352
end
1353

1354
"""
1355
    redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing)
1356

1357
Redirect a subset of the streams `stdin`, `stderr`, `stdout`,
1358
call `f()` and restore each stream.
1359

1360
Possible values for each stream are:
1361
* `nothing` indicating the stream should not be redirected.
1362
* `path::AbstractString` redirecting the stream to the file at `path`.
1363
* `io` an `IOStream`, `TTY`, [`Pipe`](@ref), socket, or `devnull`.
1364

1365
# Examples
1366
```julia-repl
1367
julia> redirect_stdio(stdout="stdout.txt", stderr="stderr.txt") do
1368
           print("hello stdout")
1369
           print(stderr, "hello stderr")
1370
       end
1371

1372
julia> read("stdout.txt", String)
1373
"hello stdout"
1374

1375
julia> read("stderr.txt", String)
1376
"hello stderr"
1377
```
1378

1379
# Edge cases
1380

1381
It is possible to pass the same argument to `stdout` and `stderr`:
1382
```julia-repl
1383
julia> redirect_stdio(stdout="log.txt", stderr="log.txt", stdin=devnull) do
1384
    ...
1385
end
1386
```
1387

1388
However it is not supported to pass two distinct descriptors of the same file.
1389
```julia-repl
1390
julia> io1 = open("same/path", "w")
1391

1392
julia> io2 = open("same/path", "w")
1393

1394
julia> redirect_stdio(f, stdout=io1, stderr=io2) # not supported
1395
```
1396
Also the `stdin` argument may not be the same descriptor as `stdout` or `stderr`.
1397
```julia-repl
1398
julia> io = open(...)
1399

1400
julia> redirect_stdio(f, stdout=io, stdin=io) # not supported
1401
```
1402

1403
!!! compat "Julia 1.7"
1404
    `redirect_stdio` requires Julia 1.7 or later.
1405
"""
1406
function redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing)
54✔
1407

1408
    function resolve(new::Nothing, oldstream, mode)
27✔
1409
        (new=nothing, close=false, old=nothing)
21✔
1410
    end
1411
    function resolve(path::AbstractString, oldstream,mode)
1412
        (new=open(path, mode), close=true, old=oldstream)
21✔
1413
    end
1414
    function resolve(new, oldstream, mode)
1415
        (new=new, close=false, old=oldstream)
9✔
1416
    end
1417

1418
    same_path(x, y) = false
27✔
1419
    function same_path(x::AbstractString, y::AbstractString)
1420
        # if x = y = "does_not_yet_exist.txt" then samefile will return false
1421
        (abspath(x) == abspath(y)) || samefile(x,y)
24✔
1422
    end
1423
    if same_path(stderr, stdin)
27✔
1424
        throw(ArgumentError("stdin and stderr cannot be the same path"))
6✔
1425
    end
1426
    if same_path(stdout, stdin)
21✔
1427
        throw(ArgumentError("stdin and stdout cannot be the same path"))
3✔
1428
    end
1429

1430
    new_in , close_in , old_in  = resolve(stdin , Base.stdin , "r")
18✔
1431
    new_out, close_out, old_out = resolve(stdout, Base.stdout, "w")
18✔
1432
    if same_path(stderr, stdout)
24✔
1433
        # make sure that in case stderr = stdout = "same/path"
1434
        # only a single io is used instead of opening the same file twice
1435
        new_err, close_err, old_err = new_out, false, Base.stderr
3✔
1436
    else
1437
        new_err, close_err, old_err = resolve(stderr, Base.stderr, "w")
15✔
1438
    end
1439

1440
    redirect_stdio(; stderr=new_err, stdin=new_in, stdout=new_out)
20✔
1441

1442
    try
18✔
1443
        return f()
18✔
1444
    finally
1445
        redirect_stdio(;stderr=old_err, stdin=old_in, stdout=old_out)
18✔
1446
        close_err && close(new_err)
18✔
1447
        close_in  && close(new_in )
18✔
1448
        close_out && close(new_out)
18✔
1449
    end
1450
end
1451

1452
function (f::RedirectStdStream)(thunk::Function, stream)
82✔
1453
    stdold = f.unix_fd == 0 ? stdin :
158✔
1454
             f.unix_fd == 1 ? stdout :
1455
             f.unix_fd == 2 ? stderr :
1456
             throw(ArgumentError("Not implemented to get old handle of fd except for stdio"))
1457
    f(stream)
112✔
1458
    try
82✔
1459
        return thunk()
85✔
1460
    finally
1461
        f(stdold)
82✔
1462
    end
1463
end
1464

1465

1466
"""
1467
    redirect_stdout(f::Function, stream)
1468

1469
Run the function `f` while redirecting [`stdout`](@ref) to `stream`.
1470
Upon completion, [`stdout`](@ref) is restored to its prior setting.
1471
"""
1472
redirect_stdout(f::Function, stream)
1473

1474
"""
1475
    redirect_stderr(f::Function, stream)
1476

1477
Run the function `f` while redirecting [`stderr`](@ref) to `stream`.
1478
Upon completion, [`stderr`](@ref) is restored to its prior setting.
1479
"""
1480
redirect_stderr(f::Function, stream)
1481

1482
"""
1483
    redirect_stdin(f::Function, stream)
1484

1485
Run the function `f` while redirecting [`stdin`](@ref) to `stream`.
1486
Upon completion, [`stdin`](@ref) is restored to its prior setting.
1487
"""
1488
redirect_stdin(f::Function, stream)
1489

1490
mark(x::LibuvStream)     = mark(x.buffer)
8,050✔
1491
unmark(x::LibuvStream)   = unmark(x.buffer)
6✔
1492
reset(x::LibuvStream)    = reset(x.buffer)
8,050✔
1493
ismarked(x::LibuvStream) = ismarked(x.buffer)
12✔
1494

1495
function peek(s::LibuvStream, ::Type{T}) where T
8,044✔
1496
    mark(s)
8,044✔
1497
    try read(s, T)
8,044✔
1498
    finally
1499
        reset(s)
8,044✔
1500
    end
1501
end
1502

1503
# BufferStream's are non-OS streams, backed by a regular IOBuffer
1504
mutable struct BufferStream <: LibuvStream
1505
    buffer::IOBuffer
1506
    cond::Threads.Condition
1507
    readerror::Any
1508
    buffer_writes::Bool
1509
    lock::ReentrantLock # advisory lock
1510
    status::Int
1511

1512
    BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, false, ReentrantLock(), StatusActive)
78✔
1513
end
1514

1515
isopen(s::BufferStream) = s.status != StatusClosed
3✔
1516

1517
closewrite(s::BufferStream) = close(s)
2✔
1518

1519
function close(s::BufferStream)
1520
    lock(s.cond) do
2✔
1521
        s.status = StatusClosed
1522
        notify(s.cond) # aka flush
1523
        nothing
1524
    end
1525
end
1526
uvfinalize(s::BufferStream) = nothing
×
1527
setup_stdio(stream::BufferStream, child_readable::Bool) = invoke(setup_stdio, Tuple{IO, Bool}, stream, child_readable)
×
1528

1529
function read(s::BufferStream, ::Type{UInt8})
1530
    nread = lock(s.cond) do
674,571✔
1531
        wait_readnb(s, 1)
1532
        read(s.buffer, UInt8)
1533
    end
1534
    return nread
×
1535
end
1536
function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt)
1537
    lock(s.cond) do
3✔
1538
        wait_readnb(s, Int(nb))
1539
        unsafe_read(s.buffer, a, nb)
1540
        nothing
1541
    end
1542
end
1543
bytesavailable(s::BufferStream) = bytesavailable(s.buffer)
674,418✔
1544

1545
isreadable(s::BufferStream) = (isopen(s) || bytesavailable(s) > 0) && s.buffer.readable
×
1546
iswritable(s::BufferStream) = isopen(s) && s.buffer.writable
×
1547

1548
function wait_readnb(s::BufferStream, nb::Int)
×
1549
    lock(s.cond) do
×
1550
        while isopen(s) && bytesavailable(s.buffer) < nb
×
1551
            wait(s.cond)
×
1552
        end
×
1553
    end
1554
end
1555

1556
function readavailable(this::BufferStream)
×
1557
    bytes = lock(this.cond) do
×
1558
        wait_readnb(this, 1)
×
1559
        buf = this.buffer
×
1560
        @assert buf.seekable == false
×
1561
        take!(buf)
×
1562
    end
1563
    return bytes
×
1564
end
1565

1566
function read(stream::BufferStream)
1567
    bytes = lock(stream.cond) do
44✔
1568
        wait_close(stream)
1569
        take!(stream.buffer)
1570
    end
1571
    return bytes
×
1572
end
1573

1574
function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int)
1575
    sbuf = s.buffer
3✔
1576
    @assert sbuf.seekable == false
3✔
1577
    @assert sbuf.maxsize >= nb
3✔
1578

1579
    function wait_locked(s, buf, nb)
×
1580
        while bytesavailable(buf) < nb
1581
            s.readerror === nothing || throw(s.readerror)
1582
            isopen(s) || break
1583
            s.status != StatusEOF || break
1584
            wait_readnb(s, nb)
1585
        end
1586
    end
1587

1588
    bytes = lock(s.cond) do
3✔
1589
        if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
1590
            wait_locked(s, sbuf, nb)
1591
        end
1592
        if bytesavailable(sbuf) >= nb
1593
            nread = readbytes!(sbuf, a, nb)
1594
        else
1595
            initsize = length(a)
1596
            newbuf = _truncated_pipebuffer(a; maxsize=nb)
1597
            nread = try
1598
                s.buffer = newbuf
1599
                write(newbuf, sbuf)
1600
                wait_locked(s, newbuf, nb)
1601
                bytesavailable(newbuf)
1602
            finally
1603
                s.buffer = sbuf
1604
            end
1605
            _take!(a, _unsafe_take!(newbuf))
1606
            length(a) >= initsize || resize!(a, initsize)
1607
        end
1608
        return nread
1609
    end
1610
    return bytes
3✔
1611
end
1612

1613
show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")
3✔
1614

1615
function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
×
1616
    bytes = lock(s.cond) do
×
1617
        while isopen(s) && !occursin(c, s.buffer)
×
1618
            wait(s.cond)
×
1619
        end
×
1620
        readuntil(s.buffer, c, keep=keep)
×
1621
    end
1622
    return bytes
×
1623
end
1624

1625
function wait_close(s::BufferStream)
×
1626
    lock(s.cond) do
×
1627
        while isopen(s)
×
1628
            wait(s.cond)
×
1629
        end
×
1630
    end
1631
end
1632

1633
start_reading(s::BufferStream) = Int32(0)
×
1634
stop_reading(s::BufferStream) = nothing
×
1635

1636
write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b))
×
1637
function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt)
1638
    nwrite = lock(s.cond) do
393✔
1639
        check_open(s)
1640
        rv = unsafe_write(s.buffer, p, nb)
1641
        s.buffer_writes || notify(s.cond)
1642
        rv
1643
    end
1644
    return nwrite
×
1645
end
1646

1647
function eof(s::BufferStream)
1648
    bytesavailable(s) > 0 && return false
674,250✔
1649
    iseof = lock(s.cond) do
129✔
1650
        wait_readnb(s, 1)
1651
        return !isopen(s) && bytesavailable(s) <= 0
1652
    end
1653
    return iseof
129✔
1654
end
1655

1656
# If buffer_writes is called, it will delay notifying waiters till a flush is called.
1657
buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s)
×
1658
function flush(s::BufferStream)
×
1659
    lock(s.cond) do
×
1660
        check_open(s)
×
1661
        notify(s.cond)
×
1662
        nothing
×
1663
    end
1664
end
1665

1666
skip(s::BufferStream, n) = skip(s.buffer, n)
×
1667

1668
function reseteof(s::BufferStream)
×
1669
    lock(s.cond) do
×
1670
        s.status = StatusOpen
×
1671
        nothing
×
1672
    end
1673
    nothing
×
1674
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc