• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37474

pending completion
#37474

push

local

web-flow
irinterp: Allow setting all IR flags (#48993)

Currently, `IR_FLAG_NOTHROW` is the only flag that irinterp is allowed to
set on statements, under the assumption that in order for a call to
be irinterp-eligible, it must have been proven `:foldable`, thus `:effect_free`,
and thus `IR_FLAG_EFFECT_FREE` was assumed to have been set. That reasoning
was sound at the time this code was written, but have since introduced
`EFFECT_FREE_IF_INACCESSIBLEMEMONLY`, which breaks the reasoning that
an `:effect_free` inference for the whole function implies the flag on
every statement. As a result, we were failing to DCE otherwise dead
statements if the IR came from irinterp.

3 of 3 new or added lines in 1 file covered. (100.0%)

70258 of 82316 relevant lines covered (85.35%)

32461773.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.38
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
1✔
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
7,772,095✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
98✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

5✔
16
        # Process bt_raw so that it can be safely serialized
1✔
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
98✔
18
        CapturedException(ex, bt_lines)
98✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
187✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
5✔
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
5✔
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
6✔
35
"""
6✔
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
50✔
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
146✔
49
    CompositeException(exceptions) = new(exceptions)
13✔
50
end
51
length(c::CompositeException) = length(c.exceptions)
10✔
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
13✔
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
138✔
54
isempty(c::CompositeException) = isempty(c.exceptions)
10✔
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
16✔
56
eltype(::Type{CompositeException}) = Any
9✔
57

58
function showerror(io::IO, ex::CompositeException)
10✔
59
    if !isempty(ex)
10✔
60
        showerror(io, ex.exceptions[1])
31✔
61
        remaining = length(ex) - 1
9✔
62
        if remaining > 0
9✔
63
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
4✔
64
        end
65
    else
66
        print(io, "CompositeException()\n")
73✔
67
    end
68
end
69

70
"""
71
    TaskFailedException
72

73
This exception is thrown by a `wait(t)` call when task `t` fails.
74
`TaskFailedException` wraps the failed task `t`.
75
"""
76
struct TaskFailedException <: Exception
77
    task::Task
1,701✔
78
end
79

80
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
37✔
81
    print(io, "TaskFailedException")
13✔
82
    if bt !== nothing && backtrace
13✔
83
        show_backtrace(io, bt)
2✔
84
    end
85
    println(io)
13✔
86
    printstyled(io, "\n    nested task error: ", color=error_color())
13✔
87
    show_task_exception(io, ex.task)
13✔
88
end
89

90
function show_task_exception(io::IO, t::Task; indent = true)
26✔
91
    stack = current_exceptions(t)
13✔
92
    b = IOBuffer()
13✔
93
    if isempty(stack)
13✔
94
        # exception stack buffer not available; probably a serialized task
95
        showerror(IOContext(b, io), t.result)
1✔
96
    else
97
        show_exception_stack(IOContext(b, io), stack)
12✔
98
    end
99
    str = String(take!(b))
13✔
100
    if indent
13✔
101
        str = replace(str, "\n" => "\n    ")
13✔
102
    end
103
    print(io, str)
13✔
104
end
105

106
function show(io::IO, t::Task)
×
107
    print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
108
end
109

110
"""
111
    @task
112

113
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
114
creates a task, and does not run it.
115

116
# Examples
117
```jldoctest
118
julia> a1() = sum(i for i in 1:1000);
80,000✔
119

90,000✔
120
julia> b = @task a1();
121

122
julia> istaskstarted(b)
123
false
124

125
julia> schedule(b);
126

127
julia> yield();
128

129
julia> istaskdone(b)
130
true
131
```
132
"""
133
macro task(ex)
22✔
134
    :(Task(()->$(esc(ex))))
1,506✔
135
end
136

137
"""
138
    current_task()
139

140
Get the currently running [`Task`](@ref).
141
"""
142
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
142,170,220✔
143

144
# task states
145

146
const task_state_runnable = UInt8(0)
147
const task_state_done     = UInt8(1)
148
const task_state_failed   = UInt8(2)
149

150
const _state_index = findfirst(==(:_state), fieldnames(Task))
151
@eval function load_state_acquire(t)
3✔
152
    # TODO: Replace this by proper atomic operations when available
153
    @GC.preserve t llvmcall($("""
18,482,996✔
154
        %ptr = inttoptr i$(Sys.WORD_SIZE) %0 to i8*
155
        %rv = load atomic i8, i8* %ptr acquire, align 8
156
        ret i8 %rv
157
        """), UInt8, Tuple{Ptr{UInt8}},
158
        Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index)))
159
end
160

161
@inline function getproperty(t::Task, field::Symbol)
6,883,881✔
162
    if field === :state
6,883,881✔
163
        # TODO: this field name should be deprecated in 2.0
4✔
164
        st = load_state_acquire(t)
11✔
165
        if st === task_state_runnable
11✔
166
            return :runnable
1✔
167
        elseif st === task_state_done
10✔
168
            return :done
6✔
169
        elseif st === task_state_failed
8✔
170
            return :failed
4✔
171
        else
172
            @assert false
×
173
        end
174
    elseif field === :backtrace
6,883,870✔
175
        # TODO: this field name should be deprecated in 2.0
176
        return current_exceptions(t)[end][2]
×
177
    elseif field === :exception
6,883,870✔
178
        # TODO: this field name should be deprecated in 2.0
179
        return t._isexception ? t.result : nothing
18✔
180
    else
181
        return getfield(t, field)
460,234,692✔
182
    end
183
end
184

185
"""
186
    istaskdone(t::Task) -> Bool
187

188
Determine whether a task has exited.
189

190
# Examples
191
```jldoctest
192
julia> a2() = sum(i for i in 1:1000);
193

194
julia> b = Task(a2);
3✔
195

196
julia> istaskdone(b)
197
false
198

199
julia> schedule(b);
200

201
julia> yield();
202

203
julia> istaskdone(b)
204
true
205
```
206
"""
207
istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable
9,876,037✔
208

209
"""
210
    istaskstarted(t::Task) -> Bool
211

212
Determine whether a task has started executing.
213

214
# Examples
215
```jldoctest
216
julia> a3() = sum(i for i in 1:1000);
217

218
julia> b = Task(a3);
219

220
julia> istaskstarted(b)
221
false
222
```
223
"""
224
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
4✔
225

226
"""
227
    istaskfailed(t::Task) -> Bool
228

229
Determine whether a task has exited because an exception was thrown.
230

231
# Examples
232
```jldoctest
233
julia> a4() = error("task failed");
234

235
julia> b = Task(a4);
236

237
julia> istaskfailed(b)
238
false
239

240
julia> schedule(b);
241

242
julia> yield();
243

244
julia> istaskfailed(b)
245
true
246
```
247

248
!!! compat "Julia 1.3"
249
    This function requires at least Julia 1.3.
250
"""
251
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)
8,610,663✔
252

253
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
5,246,616✔
254
function Threads.threadpool(t::Task)
×
255
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
4,664,722✔
256
    return tpid == 0 ? :default : :interactive
4,664,843✔
257
end
258

259
task_result(t::Task) = t.result
4,938,412✔
260

261
task_local_storage() = get_task_tls(current_task())
39,878,868✔
262
function get_task_tls(t::Task)
×
263
    if t.storage === nothing
39,878,962✔
264
        t.storage = IdDict()
903✔
265
    end
266
    return (t.storage)::IdDict{Any,Any}
39,878,942✔
267
end
268

269
"""
270
    task_local_storage(key)
271

272
Look up the value of a key in the current task's task-local storage.
273
"""
274
task_local_storage(key) = task_local_storage()[key]
1,473✔
275

276
"""
277
    task_local_storage(key, value)
278

279
Assign a value to a key in the current task's task-local storage.
280
"""
281
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
282

283
"""
284
    task_local_storage(body, key, value)
285

286
Call the function `body` with a modified task-local storage, in which `value` is assigned to
287
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
288
for emulating dynamic scoping.
289
"""
290
function task_local_storage(body::Function, key, val)
×
291
    tls = task_local_storage()
×
292
    hadkey = haskey(tls, key)
×
293
    old = get(tls, key, nothing)
×
294
    tls[key] = val
×
295
    try
×
296
        return body()
×
297
    finally
298
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
299
    end
300
end
301

302
# just wait for a task to be done, no error propagation
303
function _wait(t::Task)
4,612,068✔
304
    if !istaskdone(t)
4,612,045✔
305
        lock(t.donenotify)
2,442,252✔
306
        try
2,442,341✔
307
            while !istaskdone(t)
4,883,259✔
308
                wait(t.donenotify)
2,441,502✔
309
            end
2,441,286✔
310
        finally
311
            unlock(t.donenotify)
2,441,195✔
312
        end
313
    end
314
    nothing
4,611,597✔
315
end
316

317
# have `waiter` wait for `t`
318
function _wait2(t::Task, waiter::Task)
120,073✔
319
    if !istaskdone(t)
120,073✔
320
        lock(t.donenotify)
120,073✔
321
        if !istaskdone(t)
120,073✔
322
            push!(t.donenotify.waitq, waiter)
120,073✔
323
            unlock(t.donenotify)
120,073✔
324
            # since _wait2 is similar to schedule, we should observe the sticky
325
            # bit, even if we aren't calling `schedule` due to this early-return
326
            if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
120,073✔
327
                # Issue #41324
328
                # t.sticky && tid == 0 is a task that needs to be co-scheduled with
329
                # the parent task. If the parent (current_task) is not sticky we must
330
                # set it to be sticky.
331
                # XXX: Ideally we would be able to unset this
332
                current_task().sticky = true
287✔
333
                tid = Threads.threadid()
287✔
334
                ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
287✔
335
            end
336
            return nothing
120,073✔
337
        else
338
            unlock(t.donenotify)
×
339
        end
340
    end
341
    schedule(waiter)
×
342
    nothing
×
343
end
344

345
function wait(t::Task)
1,193,336✔
346
    t === current_task() && error("deadlock detected: cannot wait on current task")
2,253,638✔
347
    _wait(t)
2,253,648✔
348
    if istaskfailed(t)
2,253,268✔
349
        throw(TaskFailedException(t))
1,550✔
350
    end
351
    nothing
2,251,657✔
352
end
353

354
"""
355
    fetch(x::Any)
356

357
Return `x`.
358
"""
359
fetch(@nospecialize x) = x
×
360

361
"""
362
    fetch(t::Task)
363

364
Wait for a Task to finish, then return its result value.
365
If the task fails with an exception, a `TaskFailedException` (which wraps the failed task)
366
is thrown.
367
"""
368
function fetch(t::Task)
1,058,903✔
369
    wait(t)
1,058,967✔
370
    return task_result(t)
1,058,554✔
371
end
372

373

374
## lexically-scoped waiting for multiple items
375

376
struct ScheduledAfterSyncException <: Exception
377
    values::Vector{Any}
141✔
378
end
379

380
function showerror(io::IO, ex::ScheduledAfterSyncException)
3✔
381
    print(io, "ScheduledAfterSyncException: ")
3✔
382
    if isempty(ex.values)
3✔
383
        print(io, "(no values)")
×
384
        return
×
385
    end
386
    show(io, ex.values[1])
3✔
387
    if length(ex.values) == 1
3✔
388
        print(io, " is")
1✔
389
    elseif length(ex.values) == 2
2✔
390
        print(io, " and one more ")
1✔
391
        print(io, nameof(typeof(ex.values[2])))
1✔
392
        print(io, " are")
1✔
393
    else
394
        print(io, " and ", length(ex.values) - 1, " more objects are")
1✔
395
    end
396
    print(io, " registered after the end of a `@sync` block")
3✔
397
end
398

399
function sync_end(c::Channel{Any})
4,397✔
400
    local c_ex
×
401
    while isready(c)
1,151,313✔
402
        r = take!(c)
1,146,916✔
403
        if isa(r, Task)
1,146,916✔
404
            _wait(r)
1,146,733✔
405
            if istaskfailed(r)
1,146,733✔
406
                if !@isdefined(c_ex)
10✔
407
                    c_ex = CompositeException()
5✔
408
                end
409
                push!(c_ex, TaskFailedException(r))
10✔
410
            end
411
        else
412
            try
183✔
413
                wait(r)
183✔
414
            catch e
415
                if !@isdefined(c_ex)
3✔
416
                    c_ex = CompositeException()
2✔
417
                end
418
                push!(c_ex, e)
3✔
419
            end
420
        end
421
    end
1,146,916✔
422
    close(c)
4,397✔
423

424
    # Capture all waitable objects scheduled after the end of `@sync` and
425
    # include them in the exception. This way, the user can check what was
426
    # scheduled by examining at the exception object.
427
    if isready(c)
4,396✔
428
        local racy
×
429
        for r in c
138✔
430
            if !@isdefined(racy)
138✔
431
                racy = []
138✔
432
            end
433
            push!(racy, r)
138✔
434
        end
138✔
435
        if @isdefined(racy)
138✔
436
            if !@isdefined(c_ex)
138✔
437
                c_ex = CompositeException()
138✔
438
            end
439
            # Since this is a clear programming error, show this exception first:
440
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
138✔
441
        end
442
    end
443

444
    if @isdefined(c_ex)
4,397✔
445
        throw(c_ex)
145✔
446
    end
447
    nothing
4,252✔
448
end
449

450
const sync_varname = gensym(:sync)
451

452
"""
453
    @sync
454

455
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn), `@spawnat` and `@distributed`
456
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
457
a [`CompositeException`](@ref).
458

459
# Examples
460
```julia-repl
461
julia> Threads.nthreads()
462
4
463

464
julia> @sync begin
465
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
466
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
467
       end;
468
Thread-id 3, task 1
469
Thread-id 1, task 2
470
```
471
"""
472
macro sync(block)
46✔
473
    var = esc(sync_varname)
46✔
474
    quote
46✔
475
        let $var = Channel(Inf)
4,397✔
476
            v = $(esc(block))
66,767✔
477
            sync_end($var)
4,397✔
478
            v
4,191✔
479
        end
480
    end
481
end
482

483
# schedule an expression to run asynchronously
484

485
"""
486
    @async
487

488
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
489

490
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
491
constructed underlying closure. This allows you to insert the _value_ of a variable,
492
isolating the asynchronous code from changes to the variable's value in the current task.
493

494
!!! warning
60✔
495
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
496
    parallelism is required** especially in publicly distributed libraries.  This is
497
    because a use of `@async` disables the migration of the *parent* task across worker
498
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
499
    `@async` in a library function can have a large impact on the performance of very
500
    different parts of user applications.
501

502
!!! compat "Julia 1.4"
503
    Interpolating values via `\$` is available as of Julia 1.4.
504
"""
505
macro async(expr)
235✔
506
    do_async_macro(expr)
235✔
507
end
508

509
# generate the code for @async, possibly wrapping the task in something before
510
# pushing it to the wait queue.
511
function do_async_macro(expr; wrap=identity)
460✔
512
    letargs = Base._lift_one_interp!(expr)
230✔
513

514
    thunk = esc(:(()->($expr)))
272,104✔
515
    var = esc(sync_varname)
230✔
516
    quote
230✔
517
        let $(letargs...)
109✔
518
            local task = Task($thunk)
206,142✔
519
            if $(Expr(:islocal, var))
205,438✔
520
                put!($var, $(wrap(:task)))
82,909✔
521
            end
522
            schedule(task)
206,143✔
523
            task
205,395✔
524
        end
525
    end
526
end
527

528
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
529
struct UnwrapTaskFailedException <: Exception
530
    task::Task
167✔
531
end
532

533
# common code for wait&fetch for UnwrapTaskFailedException
534
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
167✔
535
    try
167✔
536
        f(t.task)
167✔
537
    catch ex
538
        if ex isa TaskFailedException
3✔
539
            throw(ex.task.exception)
3✔
540
        else
541
            rethrow()
×
542
        end
543
    end
544
end
545

546
# the unwrapping for above task wrapper (gets triggered in sync_end())
547
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
167✔
548

549
# same for fetching the tasks, for convenience
550
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
551

552
# macro for running async code that doesn't throw wrapped exceptions
553
macro async_unwrap(expr)
554
    do_async_macro(expr, wrap=task->:(Base.UnwrapTaskFailedException($task)))
×
555
end
556

557
"""
558
    errormonitor(t::Task)
559

560
Print an error log to `stderr` if task `t` fails.
561

562
# Examples
563
```julia-repl
564
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
565
Unhandled Task ERROR: task failed
566
Stacktrace:
567
[...]
568
```
569
"""
131,070✔
570
function errormonitor(t::Task)
117,637✔
571
    t2 = Task() do
119,814✔
572
        if istaskfailed(t)
118,424✔
573
            local errs = stderr
1✔
574
            try # try to display the failure atomically
1✔
575
                errio = IOContext(PipeBuffer(), errs::IO)
1✔
576
                emphasize(errio, "Unhandled Task ")
2✔
577
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
1✔
578
                write(errs, errio)
1✔
579
            catch
580
                try # try to display the secondary error atomically
×
581
                    errio = IOContext(PipeBuffer(), errs::IO)
×
582
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
583
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
584
                    write(errs, errio)
×
585
                    flush(errs)
×
586
                    # and then the actual error, as best we can
587
                    Core.print(Core.stderr, "while handling: ")
×
588
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
589
                catch e
590
                    # give up
591
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
592
                            " while trying to print a failed Task notice; giving up\n")
593
                end
594
            end
595
        end
596
        nothing
118,424✔
597
    end
598
    t2.sticky = false
119,814✔
599
    _wait2(t, t2)
119,814✔
600
    return t
1✔
601
end
602

603
# Capture interpolated variables in $() and move them to let-block
604
function _lift_one_interp!(e)
100✔
605
    letargs = Any[]  # store the new gensymed arguments
330✔
606
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
321✔
607
    letargs
100✔
608
end
609
_lift_one_interp_helper(v, _, _) = v
×
610
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
1,350✔
611
    if expr.head === :$
1,350✔
612
        if in_quote_context  # This $ is simply interpolating out of the quote
124✔
613
            # Now, we're out of the quote, so any _further_ $ is ours.
614
            in_quote_context = false
×
615
        else
616
            newarg = gensym()
100✔
617
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
100✔
618
            return newarg  # Don't recurse into the lifted $() exprs
124✔
619
        end
620
    elseif expr.head === :quote
1,226✔
621
        in_quote_context = true   # Don't try to lift $ directly out of quotes
20✔
622
    elseif expr.head === :macrocall
1,206✔
623
        return expr  # Don't recur into macro calls, since some other macros use $
72✔
624
    end
625
    for (i,e) in enumerate(expr.args)
2,352✔
626
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
4,979✔
627
    end
4,834✔
628
    expr
1,178✔
629
end
630

631

632
# add a wait-able object to the sync pool
633
macro sync_add(expr)
634
    var = esc(sync_varname)
635
    quote
636
        local ref = $(esc(expr))
637
        put!($var, ref)
638
        ref
639
    end
640
end
641

642
# runtime system hook called when a task finishes
643
function task_done_hook(t::Task)
3,881,956✔
644
    # `finish_task` sets `sigatomic` before entering this function
645
    err = istaskfailed(t)
3,880,919✔
646
    result = task_result(t)
3,881,331✔
647
    handled = false
×
648

649
    donenotify = t.donenotify
3,881,175✔
650
    if isa(donenotify, ThreadSynchronizer)
3,881,129✔
651
        lock(donenotify)
3,881,113✔
652
        try
3,881,361✔
653
            if !isempty(donenotify.waitq)
3,880,876✔
654
                handled = true
2,559,920✔
655
                notify(donenotify)
2,559,922✔
656
            end
657
        finally
658
            unlock(donenotify)
7,758,647✔
659
        end
660
    end
661

662
    if err && !handled && Threads.threadid() == 1
3,880,083✔
663
        if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
12✔
664
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
665
            active_repl_backend.in_eval
666
            throwto(active_repl_backend.backend_task, result) # this terminates the task
×
667
        end
668
    end
669
    # Clear sigatomic before waiting
670
    sigatomic_end()
3,880,020✔
671
    try
3,879,887✔
672
        wait() # this will not return
3,879,505✔
673
    catch e
674
        # If an InterruptException happens while blocked in the event loop, try handing
675
        # the exception to the REPL task since the current task is done.
676
        # issue #19467
677
        if Threads.threadid() == 1 &&
×
678
            isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
679
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
680
            active_repl_backend.in_eval
681
            throwto(active_repl_backend.backend_task, e)
×
682
        else
683
            rethrow()
×
684
        end
685
    end
686
end
687

688

134✔
689
## scheduler and work queue
690

691
struct IntrusiveLinkedListSynchronized{T}
692
    queue::IntrusiveLinkedList{T}
693
    lock::Threads.SpinLock
694
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
30✔
695
end
696
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
12,111,256✔
697
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
698
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5,032,592✔
699
    lock(W.lock)
5,032,592✔
700
    try
5,032,593✔
701
        push!(W.queue, t)
8,711,810✔
702
    finally
703
        unlock(W.lock)
10,065,103✔
704
    end
705
    return W
5,032,593✔
706
end
707
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
6✔
708
    lock(W.lock)
6✔
709
    try
6✔
710
        pushfirst!(W.queue, t)
12✔
711
    finally
712
        unlock(W.lock)
12✔
713
    end
714
    return W
6✔
715
end
716
function pop!(W::IntrusiveLinkedListSynchronized)
×
717
    lock(W.lock)
×
718
    try
×
719
        return pop!(W.queue)
×
720
    finally
721
        unlock(W.lock)
×
722
    end
723
end
724
function popfirst!(W::IntrusiveLinkedListSynchronized)
5,032,416✔
725
    lock(W.lock)
5,032,416✔
726
    try
5,032,416✔
727
        return popfirst!(W.queue)
8,711,545✔
728
    finally
729
        unlock(W.lock)
5,032,415✔
730
    end
731
end
732
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
6✔
733
    lock(W.lock)
6✔
734
    try
6✔
735
        list_deletefirst!(W.queue, t)
6✔
736
    finally
737
        unlock(W.lock)
12✔
738
    end
739
    return W
6✔
740
end
741

742
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
743
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
744
const Workqueues_lock = Threads.SpinLock()
745
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
746

747
function workqueue_for(tid::Int)
12,641,669✔
748
    qs = Workqueues
12,641,556✔
749
    if length(qs) >= tid && isassigned(qs, tid)
12,641,610✔
750
        return @inbounds qs[tid]
12,642,233✔
751
    end
752
    # slow path to allocate it
753
    l = Workqueues_lock
×
754
    @lock l begin
30✔
755
        qs = Workqueues
30✔
756
        if length(qs) < tid
30✔
757
            nt = Threads.maxthreadid()
8✔
758
            @assert tid <= nt
8✔
759
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
8✔
760
        end
761
        if !isassigned(qs, tid)
34✔
762
            @inbounds qs[tid] = StickyWorkqueue()
30✔
763
        end
764
        return @inbounds qs[tid]
26✔
765
    end
766
end
767

768
function enq_work(t::Task)
7,611,868✔
769
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
7,611,813✔
770

771
    # Sticky tasks go into their thread's work queue.
772
    if t.sticky
7,611,382✔
773
        tid = Threads.threadid(t)
2,947,172✔
774
        if tid == 0 && !GC.in_finalizer()
2,947,174✔
775
            # The task is not yet stuck to a thread. Stick it to the current
776
            # thread and do the same to the parent task (the current task) so
777
            # that the tasks are correctly co-scheduled (issue #41324).
778
            # XXX: Ideally we would be able to unset this.
779
            tid = Threads.threadid()
273,198✔
780
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
273,198✔
781
            current_task().sticky = true
273,198✔
782
        end
783
        push!(workqueue_for(tid), t)
2,947,175✔
784
    else
785
        tp = Threads.threadpool(t)
4,664,479✔
786
        if Threads.threadpoolsize(tp) == 1
4,665,318✔
787
            # There's only one thread in the task's assigned thread pool;
788
            # use its work queue.
789
            tid = (tp === :default) ? 1 : Threads.threadpoolsize(:default)+1
2,085,422✔
790
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
2,085,421✔
791
            push!(workqueue_for(tid), t)
2,085,421✔
792
        else
793
            # Otherwise, put the task in the multiqueue.
794
            Partr.multiq_insert(t, t.priority)
2,579,936✔
795
            tid = 0
×
796
        end
797
    end
798
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
7,608,734✔
799
    return t
7,613,011✔
800
end
801

802
schedule(t::Task) = enq_work(t)
3,763,438✔
803

804
"""
805
    schedule(t::Task, [val]; error=false)
806

807
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
808
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
809

810
If a second argument `val` is provided, it will be passed to the task (via the return value of
811
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
812
the woken task.
813

814
!!! warning
815
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
816
    See [the API reference](@ref low-level-schedule-wait) for more information.
817

818
# Examples
819
```jldoctest
820
julia> a5() = sum(i for i in 1:1000);
821

822
julia> b = Task(a5);
823

824
julia> istaskstarted(b)
825
false
826

827
julia> schedule(b);
828

829
julia> yield();
830

831
julia> istaskstarted(b)
832
true
833

834
julia> istaskdone(b)
835
true
836
```
837
"""
838
function schedule(t::Task, @nospecialize(arg); error=false)
7,645,069✔
839
    # schedule a task to be (re)started with the given value or exception
840
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
3,822,709✔
841
    if error
3,822,603✔
842
        t.queue === nothing || Base.list_deletefirst!(t.queue, t)
2,281✔
843
        setfield!(t, :result, arg)
2,279✔
844
        setfield!(t, :_isexception, true)
2,279✔
845
    else
846
        t.queue === nothing || Base.error("schedule: Task not runnable")
3,820,305✔
847
        setfield!(t, :result, arg)
3,820,135✔
848
    end
849
    enq_work(t)
3,822,388✔
850
    return t
3,823,763✔
851
end
852

853
"""
854
    yield()
855

856
Switch to the scheduler to allow another scheduled task to run. A task that calls this
857
function is still runnable, and will be restarted immediately if there are no other runnable
858
tasks.
859
"""
860
function yield()
26,396✔
861
    ct = current_task()
26,396✔
862
    enq_work(ct)
26,395✔
863
    try
26,397✔
864
        wait()
26,397✔
865
    catch
866
        ct.queue === nothing || list_deletefirst!(ct.queue, ct)
5✔
867
        rethrow()
5✔
868
    end
869
end
870

871
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
7,608,619✔
872

873
"""
874
    yield(t::Task, arg = nothing)
875

876
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
877
immediately yields to `t` before calling the scheduler.
878
"""
879
function yield(t::Task, @nospecialize(x=nothing))
90✔
880
    (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
450✔
881
    t.result = x
225✔
882
    enq_work(current_task())
225✔
883
    set_next_task(t)
225✔
884
    return try_yieldto(ensure_rescheduled)
225✔
885
end
886

887
"""
888
    yieldto(t::Task, arg = nothing)
889

890
Switch to the given task. The first time a task is switched to, the task's function is
891
called with no arguments. On subsequent switches, `arg` is returned from the task's last
892
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
893
or scheduling in any way. Its use is discouraged.
894
"""
895
function yieldto(t::Task, @nospecialize(x=nothing))
13✔
896
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
897
    # state error instead.
898
    if t._state === task_state_done
15✔
899
        return x
×
900
    elseif t._state === task_state_failed
7✔
901
        throw(t.result)
×
902
    end
903
    t.result = x
7✔
904
    set_next_task(t)
7✔
905
    return try_yieldto(identity)
7✔
906
end
907

908
function try_yieldto(undo)
7,605,764✔
909
    try
7,605,787✔
910
        ccall(:jl_switch, Cvoid, ())
7,605,863✔
911
    catch
912
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
7✔
913
        rethrow()
7✔
914
    end
915
    ct = current_task()
3,725,387✔
916
    if ct._isexception
3,726,394✔
917
        exc = ct.result
2,280✔
918
        ct.result = nothing
2,280✔
919
        ct._isexception = false
2,280✔
920
        throw(exc)
2,280✔
921
    end
922
    result = ct.result
3,724,120✔
923
    ct.result = nothing
3,724,771✔
924
    return result
3,724,980✔
925
end
926

927
# yield to a task, throwing an exception in it
928
function throwto(t::Task, @nospecialize exc)
4✔
929
    t.result = exc
4✔
930
    t._isexception = true
4✔
931
    set_next_task(t)
4✔
932
    return try_yieldto(identity)
4✔
933
end
934

935
function ensure_rescheduled(othertask::Task)
6✔
936
    ct = current_task()
6✔
937
    W = workqueue_for(Threads.threadid())
6✔
938
    if ct !== othertask && othertask._state === task_state_runnable
6✔
939
        # we failed to yield to othertask
940
        # return it to the head of a queue to be retried later
941
        tid = Threads.threadid(othertask)
6✔
942
        Wother = tid == 0 ? W : workqueue_for(tid)
12✔
943
        pushfirst!(Wother, othertask)
6✔
944
    end
945
    # if the current task was queued,
946
    # also need to return it to the runnable state
947
    # before throwing an error
948
    list_deletefirst!(W, ct)
6✔
949
    nothing
6✔
950
end
951

952
function trypoptask(W::StickyWorkqueue)
12,111,167✔
953
    while !isempty(W)
12,110,447✔
954
        t = popfirst!(W)
5,032,416✔
955
        if t._state !== task_state_runnable
5,032,416✔
956
            # assume this somehow got queued twice,
957
            # probably broken now, but try discarding this switch and keep going
958
            # can't throw here, because it's probably not the fault of the caller to wait
959
            # and don't want to use print() here, because that may try to incur a task switch
960
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
961
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
962
            continue
×
963
        end
964
        return t
5,032,415✔
965
    end
1✔
966
    return Partr.multiq_deletemin()
7,082,530✔
967
end
968

969
checktaskempty = Partr.multiq_check_empty
970

971
@noinline function poptask(W::StickyWorkqueue)
7,608,183✔
972
    task = trypoptask(W)
7,607,820✔
973
    if !(task isa Task)
7,606,680✔
974
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
1,286,298✔
975
    end
976
    set_next_task(task)
7,608,543✔
977
    nothing
7,607,325✔
978
end
979

980
function wait()
7,610,672✔
981
    GC.safepoint()
7,609,208✔
982
    W = workqueue_for(Threads.threadid())
7,608,254✔
983
    poptask(W)
7,608,247✔
984
    result = try_yieldto(ensure_rescheduled)
7,607,491✔
985
    process_events()
3,725,068✔
986
    # return when we come out of the queue
987
    return result
3,726,725✔
988
end
989

990
if Sys.iswindows()
991
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
992
else
993
    pause() = ccall(:pause, Cvoid, ())
×
994
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc