• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37594

pending completion
#37594

push

local

web-flow
Move `round(T::Type, x)` docstring above `round(z::Complex, ...)` docstring (#50775)

73676 of 84540 relevant lines covered (87.15%)

32579691.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.25
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
15,777,795✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
59✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

16
        # Process bt_raw so that it can be safely serialized
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
59✔
18
        CapturedException(ex, bt_lines)
59✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
110✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
3✔
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
3✔
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
35
"""
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
187✔
49
    CompositeException(exceptions) = new(exceptions)
13✔
50
end
51
length(c::CompositeException) = length(c.exceptions)
11✔
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
14✔
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
178✔
54
isempty(c::CompositeException) = isempty(c.exceptions)
11✔
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
7✔
56
eltype(::Type{CompositeException}) = Any
×
57

58
function showerror(io::IO, ex::CompositeException)
11✔
59
    if !isempty(ex)
11✔
60
        showerror(io, ex.exceptions[1])
10✔
61
        remaining = length(ex) - 1
10✔
62
        if remaining > 0
10✔
63
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
4✔
64
        end
65
    else
66
        print(io, "CompositeException()\n")
1✔
67
    end
68
end
69

70
"""
71
    TaskFailedException
72

73
This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails.
74
`TaskFailedException` wraps the failed task `t`.
75
"""
76
struct TaskFailedException <: Exception
77
    task::Task
1,556✔
78
end
79

80
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
35✔
81
    print(io, "TaskFailedException")
12✔
82
    if bt !== nothing && backtrace
12✔
83
        show_backtrace(io, bt)
1✔
84
    end
85
    println(io)
12✔
86
    printstyled(io, "\n    nested task error: ", color=error_color())
12✔
87
    show_task_exception(io, ex.task)
12✔
88
end
89

90
function show_task_exception(io::IO, t::Task; indent = true)
24✔
91
    stack = current_exceptions(t)
12✔
92
    b = IOBuffer()
12✔
93
    if isempty(stack)
12✔
94
        # exception stack buffer not available; probably a serialized task
95
        showerror(IOContext(b, io), t.result)
1✔
96
    else
97
        show_exception_stack(IOContext(b, io), stack)
11✔
98
    end
99
    str = String(take!(b))
12✔
100
    if indent
12✔
101
        str = replace(str, "\n" => "\n    ")
12✔
102
    end
103
    print(io, str)
12✔
104
end
105

106
function show(io::IO, t::Task)
×
107
    state = t.state
×
108
    state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "")
×
109
    print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
110
end
111

112
"""
113
    @task
114

115
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
116
creates a task, and does not run it.
117

118
# Examples
119
```jldoctest
120
julia> a1() = sum(i for i in 1:1000);
121

122
julia> b = @task a1();
123

124
julia> istaskstarted(b)
125
false
126

127
julia> schedule(b);
128

129
julia> yield();
130

131
julia> istaskdone(b)
132
true
133
```
134
"""
135
macro task(ex)
21✔
136
    thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__)
21✔
137
    :(Task($thunk))
21✔
138
end
139

140
"""
141
    current_task()
142

143
Get the currently running [`Task`](@ref).
144
"""
145
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
213,782,139✔
146

147
# task states
148

149
const task_state_runnable = UInt8(0)
150
const task_state_done     = UInt8(1)
151
const task_state_failed   = UInt8(2)
152

153
const _state_index = findfirst(==(:_state), fieldnames(Task))
154
@eval function load_state_acquire(t)
3✔
155
    # TODO: Replace this by proper atomic operations when available
156
    @GC.preserve t llvmcall($("""
30,477,160✔
157
        %ptr = inttoptr i$(Sys.WORD_SIZE) %0 to i8*
158
        %rv = load atomic i8, i8* %ptr acquire, align 8
159
        ret i8 %rv
160
        """), UInt8, Tuple{Ptr{UInt8}},
161
        Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index)))
162
end
163

164
@inline function getproperty(t::Task, field::Symbol)
54,874,332✔
165
    if field === :state
54,874,332✔
166
        # TODO: this field name should be deprecated in 2.0
167
        st = load_state_acquire(t)
11✔
168
        if st === task_state_runnable
11✔
169
            return :runnable
1✔
170
        elseif st === task_state_done
10✔
171
            return :done
6✔
172
        elseif st === task_state_failed
4✔
173
            return :failed
4✔
174
        else
175
            @assert false
×
176
        end
177
    elseif field === :backtrace
54,874,321✔
178
        # TODO: this field name should be deprecated in 2.0
179
        return current_exceptions(t)[end][2]
×
180
    elseif field === :exception
54,874,321✔
181
        # TODO: this field name should be deprecated in 2.0
182
        return t._isexception ? t.result : nothing
18✔
183
    else
184
        return getfield(t, field)
685,451,961✔
185
    end
186
end
187

188
"""
189
    istaskdone(t::Task) -> Bool
190

191
Determine whether a task has exited.
192

193
# Examples
194
```jldoctest
195
julia> a2() = sum(i for i in 1:1000);
196

197
julia> b = Task(a2);
198

199
julia> istaskdone(b)
200
false
201

202
julia> schedule(b);
203

204
julia> yield();
205

206
julia> istaskdone(b)
207
true
208
```
209
"""
210
istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable
13,863,852✔
211

212
"""
213
    istaskstarted(t::Task) -> Bool
214

215
Determine whether a task has started executing.
216

217
# Examples
218
```jldoctest
219
julia> a3() = sum(i for i in 1:1000);
220

221
julia> b = Task(a3);
222

223
julia> istaskstarted(b)
224
false
225
```
226
"""
227
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
4✔
228

229
"""
230
    istaskfailed(t::Task) -> Bool
231

232
Determine whether a task has exited because an exception was thrown.
233

234
# Examples
235
```jldoctest
236
julia> a4() = error("task failed");
237

238
julia> b = Task(a4);
239

240
julia> istaskfailed(b)
241
false
242

243
julia> schedule(b);
244

245
julia> yield();
246

247
julia> istaskfailed(b)
248
true
249
```
250

251
!!! compat "Julia 1.3"
252
    This function requires at least Julia 1.3.
253
"""
254
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)
16,616,939✔
255

256
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
5,130,419✔
257
function Threads.threadpool(t::Task)
×
258
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
8,792,671✔
259
    return Threads._tpid_to_sym(tpid)
17,580,012✔
260
end
261

262
task_result(t::Task) = t.result
8,943,074✔
263

264
task_local_storage() = get_task_tls(current_task())
39,806,756✔
265
function get_task_tls(t::Task)
×
266
    if t.storage === nothing
39,806,810✔
267
        t.storage = IdDict()
909✔
268
    end
269
    return (t.storage)::IdDict{Any,Any}
39,806,778✔
270
end
271

272
"""
273
    task_local_storage(key)
274

275
Look up the value of a key in the current task's task-local storage.
276
"""
277
task_local_storage(key) = task_local_storage()[key]
×
278

279
"""
280
    task_local_storage(key, value)
281

282
Assign a value to a key in the current task's task-local storage.
283
"""
284
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
285

286
"""
287
    task_local_storage(body, key, value)
288

289
Call the function `body` with a modified task-local storage, in which `value` is assigned to
290
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
291
for emulating dynamic scoping.
292
"""
293
function task_local_storage(body::Function, key, val)
×
294
    tls = task_local_storage()
×
295
    hadkey = haskey(tls, key)
×
296
    old = get(tls, key, nothing)
×
297
    tls[key] = val
×
298
    try
×
299
        return body()
×
300
    finally
301
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
302
    end
303
end
304

305
# just wait for a task to be done, no error propagation
306
function _wait(t::Task)
8,613,495✔
307
    if !istaskdone(t)
8,613,474✔
308
        donenotify = t.donenotify::ThreadSynchronizer
2,444,170✔
309
        lock(donenotify)
2,444,156✔
310
        try
2,444,331✔
311
            while !istaskdone(t)
4,887,599✔
312
                wait(donenotify)
2,443,591✔
313
            end
2,443,628✔
314
        finally
315
            unlock(donenotify)
4,887,260✔
316
        end
317
    end
318
    nothing
8,613,159✔
319
end
320

321
# have `waiter` wait for `t`
322
function _wait2(t::Task, waiter::Task)
120,769✔
323
    if !istaskdone(t)
120,769✔
324
        # since _wait2 is similar to schedule, we should observe the sticky
325
        # bit, even if we don't call `schedule` with early-return below
326
        if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
120,769✔
327
            # Issue #41324
328
            # t.sticky && tid == 0 is a task that needs to be co-scheduled with
329
            # the parent task. If the parent (current_task) is not sticky we must
330
            # set it to be sticky.
331
            # XXX: Ideally we would be able to unset this
332
            current_task().sticky = true
20✔
333
            tid = Threads.threadid()
20✔
334
            ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
20✔
335
        end
336
        donenotify = t.donenotify::ThreadSynchronizer
120,769✔
337
        lock(donenotify)
120,769✔
338
        if !istaskdone(t)
120,769✔
339
            push!(donenotify.waitq, waiter)
120,810✔
340
            unlock(donenotify)
241,538✔
341
            return nothing
120,769✔
342
        else
343
            unlock(donenotify)
×
344
        end
345
    end
346
    schedule(waiter)
×
347
    nothing
×
348
end
349

350
function wait(t::Task)
1,193,676✔
351
    t === current_task() && error("deadlock detected: cannot wait on current task")
2,254,922✔
352
    _wait(t)
2,255,002✔
353
    if istaskfailed(t)
2,254,674✔
354
        throw(TaskFailedException(t))
1,404✔
355
    end
356
    nothing
2,253,044✔
357
end
358

359
"""
360
    fetch(x::Any)
361

362
Return `x`.
363
"""
364
fetch(@nospecialize x) = x
×
365

366
"""
367
    fetch(t::Task)
368

369
Wait for a [`Task`](@ref) to finish, then return its result value.
370
If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task)
371
is thrown.
372
"""
373
function fetch(t::Task)
1,059,934✔
374
    wait(t)
1,059,916✔
375
    return task_result(t)
1,059,403✔
376
end
377

378

379
## lexically-scoped waiting for multiple items
380

381
struct ScheduledAfterSyncException <: Exception
382
    values::Vector{Any}
178✔
383
end
384

385
function showerror(io::IO, ex::ScheduledAfterSyncException)
×
386
    print(io, "ScheduledAfterSyncException: ")
×
387
    if isempty(ex.values)
×
388
        print(io, "(no values)")
×
389
        return
×
390
    end
391
    show(io, ex.values[1])
×
392
    if length(ex.values) == 1
×
393
        print(io, " is")
×
394
    elseif length(ex.values) == 2
×
395
        print(io, " and one more ")
×
396
        print(io, nameof(typeof(ex.values[2])))
×
397
        print(io, " are")
×
398
    else
399
        print(io, " and ", length(ex.values) - 1, " more objects are")
×
400
    end
401
    print(io, " registered after the end of a `@sync` block")
×
402
end
403

404
function sync_end(c::Channel{Any})
4,417✔
405
    local c_ex
×
406
    while isready(c)
5,151,446✔
407
        r = take!(c)
5,147,027✔
408
        if isa(r, Task)
5,147,029✔
409
            _wait(r)
5,146,850✔
410
            if istaskfailed(r)
5,146,850✔
411
                if !@isdefined(c_ex)
11✔
412
                    c_ex = CompositeException()
6✔
413
                end
414
                push!(c_ex, TaskFailedException(r))
11✔
415
            end
416
        else
417
            try
179✔
418
                wait(r)
179✔
419
            catch e
420
                if !@isdefined(c_ex)
3✔
421
                    c_ex = CompositeException()
2✔
422
                end
423
                push!(c_ex, e)
3✔
424
            end
425
        end
426
    end
5,147,029✔
427
    close(c)
4,417✔
428

429
    # Capture all waitable objects scheduled after the end of `@sync` and
430
    # include them in the exception. This way, the user can check what was
431
    # scheduled by examining at the exception object.
432
    if isready(c)
4,417✔
433
        local racy
×
434
        for r in c
179✔
435
            if !@isdefined(racy)
178✔
436
                racy = []
178✔
437
            end
438
            push!(racy, r)
178✔
439
        end
178✔
440
        if @isdefined(racy)
179✔
441
            if !@isdefined(c_ex)
178✔
442
                c_ex = CompositeException()
178✔
443
            end
444
            # Since this is a clear programming error, show this exception first:
445
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
178✔
446
        end
447
    end
448

449
    if @isdefined(c_ex)
4,417✔
450
        throw(c_ex)
186✔
451
    end
452
    nothing
4,231✔
453
end
454

455
const sync_varname = gensym(:sync)
456

457
"""
458
    @sync
459

460
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn), `@spawnat` and `@distributed`
461
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
462
a [`CompositeException`](@ref).
463

464
# Examples
465
```julia-repl
466
julia> Threads.nthreads()
467
4
468

469
julia> @sync begin
470
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
471
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
472
       end;
473
Thread-id 3, task 1
474
Thread-id 1, task 2
475
```
476
"""
477
macro sync(block)
60✔
478
    var = esc(sync_varname)
60✔
479
    quote
60✔
480
        let $var = Channel(Inf)
4,385✔
481
            v = $(esc(block))
66,718✔
482
            sync_end($var)
4,385✔
483
            v
4,140✔
484
        end
485
    end
486
end
487

488
# schedule an expression to run asynchronously
489

490
"""
491
    @async
492

493
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
494

495
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
496
constructed underlying closure. This allows you to insert the _value_ of a variable,
497
isolating the asynchronous code from changes to the variable's value in the current task.
498

499
!!! warning
500
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
501
    parallelism is required** especially in publicly distributed libraries.  This is
502
    because a use of `@async` disables the migration of the *parent* task across worker
503
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
504
    `@async` in a library function can have a large impact on the performance of very
505
    different parts of user applications.
506

507
!!! compat "Julia 1.4"
508
    Interpolating values via `\$` is available as of Julia 1.4.
509
"""
510
macro async(expr)
275✔
511
    do_async_macro(expr, __source__)
275✔
512
end
513

514
# generate the code for @async, possibly wrapping the task in something before
515
# pushing it to the wait queue.
516
function do_async_macro(expr, linenums; wrap=identity)
546✔
517
    letargs = Base._lift_one_interp!(expr)
273✔
518

519
    thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums)
273✔
520
    var = esc(sync_varname)
273✔
521
    quote
273✔
522
        let $(letargs...)
109✔
523
            local task = Task($thunk)
206,637✔
524
            if $(Expr(:islocal, var))
205,952✔
525
                put!($var, $(wrap(:task)))
82,845✔
526
            end
527
            schedule(task)
206,637✔
528
            task
205,914✔
529
        end
530
    end
531
end
532

533
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
534
struct UnwrapTaskFailedException <: Exception
535
    task::Task
165✔
536
end
537

538
# common code for wait&fetch for UnwrapTaskFailedException
539
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
165✔
540
    try
165✔
541
        f(t.task)
165✔
542
    catch ex
543
        if ex isa TaskFailedException
3✔
544
            throw(ex.task.exception)
3✔
545
        else
546
            rethrow()
×
547
        end
548
    end
549
end
550

551
# the unwrapping for above task wrapper (gets triggered in sync_end())
552
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
165✔
553

554
# same for fetching the tasks, for convenience
555
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
556

557
# macro for running async code that doesn't throw wrapped exceptions
558
macro async_unwrap(expr)
3✔
559
    do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task)))
6✔
560
end
561

562
"""
563
    errormonitor(t::Task)
564

565
Print an error log to `stderr` if task `t` fails.
566

567
# Examples
568
```julia-repl
569
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
570
Unhandled Task ERROR: task failed
571
Stacktrace:
572
[...]
573
```
574
"""
575
function errormonitor(t::Task)
118,744✔
576
    t2 = Task() do
120,493✔
577
        if istaskfailed(t)
119,588✔
578
            local errs = stderr
×
579
            try # try to display the failure atomically
×
580
                errio = IOContext(PipeBuffer(), errs::IO)
×
581
                emphasize(errio, "Unhandled Task ")
×
582
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
583
                write(errs, errio)
×
584
            catch
585
                try # try to display the secondary error atomically
×
586
                    errio = IOContext(PipeBuffer(), errs::IO)
×
587
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
588
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
589
                    write(errs, errio)
×
590
                    flush(errs)
×
591
                    # and then the actual error, as best we can
592
                    Core.print(Core.stderr, "while handling: ")
×
593
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
594
                catch e
595
                    # give up
596
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
597
                            " while trying to print a failed Task notice; giving up\n")
598
                end
599
            end
600
        end
601
        nothing
119,588✔
602
    end
603
    t2.sticky = false
120,493✔
604
    _wait2(t, t2)
120,493✔
605
    return t
2✔
606
end
607

608
# Capture interpolated variables in $() and move them to let-block
609
function _lift_one_interp!(e)
122✔
610
    letargs = Any[]  # store the new gensymed arguments
390✔
611
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
389✔
612
    letargs
122✔
613
end
614
_lift_one_interp_helper(v, _, _) = v
9✔
615
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
1,738✔
616
    if expr.head === :$
1,738✔
617
        if in_quote_context  # This $ is simply interpolating out of the quote
129✔
618
            # Now, we're out of the quote, so any _further_ $ is ours.
619
            in_quote_context = false
×
620
        else
621
            newarg = gensym()
105✔
622
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
105✔
623
            return newarg  # Don't recurse into the lifted $() exprs
129✔
624
        end
625
    elseif expr.head === :quote
1,609✔
626
        in_quote_context = true   # Don't try to lift $ directly out of quotes
20✔
627
    elseif expr.head === :macrocall
1,589✔
628
        return expr  # Don't recur into macro calls, since some other macros use $
83✔
629
    end
630
    for (i,e) in enumerate(expr.args)
3,083✔
631
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
6,460✔
632
    end
6,285✔
633
    expr
1,550✔
634
end
635

636

637
# add a wait-able object to the sync pool
638
macro sync_add(expr)
639
    var = esc(sync_varname)
640
    quote
641
        local ref = $(esc(expr))
642
        put!($var, ref)
643
        ref
644
    end
645
end
646

647
# runtime system hook called when a task finishes
648
function task_done_hook(t::Task)
7,886,283✔
649
    # `finish_task` sets `sigatomic` before entering this function
650
    err = istaskfailed(t)
7,884,827✔
651
    result = task_result(t)
7,885,589✔
652
    handled = false
×
653

654
    donenotify = t.donenotify
7,885,428✔
655
    if isa(donenotify, ThreadSynchronizer)
7,885,453✔
656
        lock(donenotify)
7,885,125✔
657
        try
7,884,895✔
658
            if !isempty(donenotify.waitq)
7,885,024✔
659
                handled = true
2,563,369✔
660
                notify(donenotify)
2,563,346✔
661
            end
662
        finally
663
            unlock(donenotify)
15,765,685✔
664
        end
665
    end
666

667
    if err && !handled && Threads.threadid() == 1
7,883,987✔
668
        if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
9✔
669
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
670
            active_repl_backend.in_eval
671
            throwto(active_repl_backend.backend_task, result) # this terminates the task
×
672
        end
673
    end
674
    # Clear sigatomic before waiting
675
    sigatomic_end()
7,883,844✔
676
    try
7,883,385✔
677
        wait() # this will not return
7,882,837✔
678
    catch e
679
        # If an InterruptException happens while blocked in the event loop, try handing
680
        # the exception to the REPL task since the current task is done.
681
        # issue #19467
682
        if Threads.threadid() == 1 &&
×
683
            isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
684
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
685
            active_repl_backend.in_eval
686
            throwto(active_repl_backend.backend_task, e)
×
687
        else
688
            rethrow()
×
689
        end
690
    end
691
end
692

693

694
## scheduler and work queue
695

696
struct IntrusiveLinkedListSynchronized{T}
697
    queue::IntrusiveLinkedList{T}
698
    lock::Threads.SpinLock
699
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
126✔
700
end
701
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
15,689,863✔
702
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
703
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5,979,424✔
704
    lock(W.lock)
5,979,425✔
705
    try
5,979,425✔
706
        push!(W.queue, t)
10,828,522✔
707
    finally
708
        unlock(W.lock)
11,958,759✔
709
    end
710
    return W
5,979,426✔
711
end
712
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
713
    lock(W.lock)
5✔
714
    try
5✔
715
        pushfirst!(W.queue, t)
10✔
716
    finally
717
        unlock(W.lock)
10✔
718
    end
719
    return W
5✔
720
end
721
function pop!(W::IntrusiveLinkedListSynchronized)
×
722
    lock(W.lock)
×
723
    try
×
724
        return pop!(W.queue)
×
725
    finally
726
        unlock(W.lock)
×
727
    end
728
end
729
function popfirst!(W::IntrusiveLinkedListSynchronized)
5,979,356✔
730
    lock(W.lock)
5,979,356✔
731
    try
5,979,354✔
732
        return popfirst!(W.queue)
10,828,438✔
733
    finally
734
        unlock(W.lock)
5,979,355✔
735
    end
736
end
737
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
738
    lock(W.lock)
5✔
739
    try
5✔
740
        list_deletefirst!(W.queue, t)
5✔
741
    finally
742
        unlock(W.lock)
10✔
743
    end
744
    return W
5✔
745
end
746

747
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
748
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
749
const Workqueues_lock = Threads.SpinLock()
750
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
751

752
function workqueue_for(tid::Int)
17,660,310✔
753
    qs = Workqueues
17,660,266✔
754
    if length(qs) >= tid && isassigned(qs, tid)
17,660,453✔
755
        return @inbounds qs[tid]
17,661,026✔
756
    end
757
    # slow path to allocate it
758
    @assert tid > 0
126✔
759
    l = Workqueues_lock
×
760
    @lock l begin
126✔
761
        qs = Workqueues
126✔
762
        if length(qs) < tid
126✔
763
            nt = Threads.maxthreadid()
92✔
764
            @assert tid <= nt
92✔
765
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
184✔
766
        end
767
        if !isassigned(qs, tid)
126✔
768
            @inbounds qs[tid] = StickyWorkqueue()
126✔
769
        end
770
        return @inbounds qs[tid]
126✔
771
    end
772
end
773

774
function enq_work(t::Task)
11,683,241✔
775
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
11,683,058✔
776

777
    # Sticky tasks go into their thread's work queue.
778
    if t.sticky
11,683,121✔
779
        tid = Threads.threadid(t)
2,891,206✔
780
        if tid == 0
2,891,206✔
781
            # The task is not yet stuck to a thread. Stick it to the current
782
            # thread and do the same to the parent task (the current task) so
783
            # that the tasks are correctly co-scheduled (issue #41324).
784
            # XXX: Ideally we would be able to unset this.
785
            if GC.in_finalizer()
273,754✔
786
                # The task was launched in a finalizer. There is no thread to sticky it
787
                # to, so just allow it to run anywhere as if it had been non-sticky.
788
                t.sticky = false
4✔
789
                @goto not_sticky
4✔
790
            else
791
                tid = Threads.threadid()
273,750✔
792
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
273,750✔
793
                current_task().sticky = true
273,750✔
794
            end
795
        end
796
        push!(workqueue_for(tid), t)
2,891,201✔
797
    else
798
        @label not_sticky
×
799
        tp = Threads.threadpool(t)
17,580,368✔
800
        if Threads.threadpoolsize(tp) == 1
17,582,114✔
801
            # There's only one thread in the task's assigned thread pool;
802
            # use its work queue.
803
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
6,176,446✔
804
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
3,088,224✔
805
            push!(workqueue_for(tid), t)
3,088,224✔
806
        else
807
            # Otherwise, put the task in the multiqueue.
808
            Partr.multiq_insert(t, t.priority)
5,704,582✔
809
            tid = 0
×
810
        end
811
    end
812
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
11,681,743✔
813
    return t
11,685,156✔
814
end
815

816
schedule(t::Task) = enq_work(t)
7,765,698✔
817

818
"""
819
    schedule(t::Task, [val]; error=false)
820

821
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
822
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
823

824
If a second argument `val` is provided, it will be passed to the task (via the return value of
825
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
826
the woken task.
827

828
!!! warning
829
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
830
    See [the API reference](@ref low-level-schedule-wait) for more information.
831

832
# Examples
833
```jldoctest
834
julia> a5() = sum(i for i in 1:1000);
835

836
julia> b = Task(a5);
837

838
julia> istaskstarted(b)
839
false
840

841
julia> schedule(b);
842

843
julia> yield();
844

845
julia> istaskstarted(b)
846
true
847

848
julia> istaskdone(b)
849
true
850
```
851
"""
852
function schedule(t::Task, @nospecialize(arg); error=false)
7,784,201✔
853
    # schedule a task to be (re)started with the given value or exception
854
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
3,892,280✔
855
    if error
3,892,276✔
856
        t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t)
2,213✔
857
        setfield!(t, :result, arg)
2,211✔
858
        setfield!(t, :_isexception, true)
2,211✔
859
    else
860
        t.queue === nothing || Base.error("schedule: Task not runnable")
3,890,041✔
861
        setfield!(t, :result, arg)
3,889,996✔
862
    end
863
    enq_work(t)
3,892,170✔
864
    return t
3,893,362✔
865
end
866

867
"""
868
    yield()
869

870
Switch to the scheduler to allow another scheduled task to run. A task that calls this
871
function is still runnable, and will be restarted immediately if there are no other runnable
872
tasks.
873
"""
874
function yield()
27,154✔
875
    ct = current_task()
27,154✔
876
    enq_work(ct)
27,154✔
877
    try
27,154✔
878
        wait()
27,154✔
879
    catch
880
        ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
4✔
881
        rethrow()
4✔
882
    end
883
end
884

885
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
11,675,903✔
886

887
"""
888
    yield(t::Task, arg = nothing)
889

890
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
891
immediately yields to `t` before calling the scheduler.
892
"""
893
function yield(t::Task, @nospecialize(x=nothing))
90✔
894
    (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
466✔
895
    t.result = x
233✔
896
    enq_work(current_task())
233✔
897
    set_next_task(t)
233✔
898
    return try_yieldto(ensure_rescheduled)
233✔
899
end
900

901
"""
902
    yieldto(t::Task, arg = nothing)
903

904
Switch to the given task. The first time a task is switched to, the task's function is
905
called with no arguments. On subsequent switches, `arg` is returned from the task's last
906
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
907
or scheduling in any way. Its use is discouraged.
908
"""
909
function yieldto(t::Task, @nospecialize(x=nothing))
9✔
910
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
911
    # state error instead.
912
    if t._state === task_state_done
10✔
913
        return x
×
914
    elseif t._state === task_state_failed
5✔
915
        throw(t.result)
×
916
    end
917
    t.result = x
5✔
918
    set_next_task(t)
5✔
919
    return try_yieldto(identity)
5✔
920
end
921

922
function try_yieldto(undo)
11,669,262✔
923
    try
11,669,256✔
924
        ccall(:jl_switch, Cvoid, ())
11,661,425✔
925
    catch
926
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
6✔
927
        rethrow()
6✔
928
    end
929
    ct = current_task()
3,797,620✔
930
    if ct._isexception
3,797,825✔
931
        exc = ct.result
2,212✔
932
        ct.result = nothing
2,212✔
933
        ct._isexception = false
2,212✔
934
        throw(exc)
2,212✔
935
    end
936
    result = ct.result
3,795,336✔
937
    ct.result = nothing
3,795,312✔
938
    return result
3,795,209✔
939
end
940

941
# yield to a task, throwing an exception in it
942
function throwto(t::Task, @nospecialize exc)
4✔
943
    t.result = exc
4✔
944
    t._isexception = true
4✔
945
    set_next_task(t)
4✔
946
    return try_yieldto(identity)
4✔
947
end
948

949
function ensure_rescheduled(othertask::Task)
5✔
950
    ct = current_task()
5✔
951
    W = workqueue_for(Threads.threadid())
5✔
952
    if ct !== othertask && othertask._state === task_state_runnable
5✔
953
        # we failed to yield to othertask
954
        # return it to the head of a queue to be retried later
955
        tid = Threads.threadid(othertask)
5✔
956
        Wother = tid == 0 ? W : workqueue_for(tid)
10✔
957
        pushfirst!(Wother, othertask)
5✔
958
    end
959
    # if the current task was queued,
960
    # also need to return it to the runnable state
961
    # before throwing an error
962
    list_deletefirst!(W, ct)
5✔
963
    nothing
5✔
964
end
965

966
function trypoptask(W::StickyWorkqueue)
15,690,130✔
967
    while !isempty(W)
15,689,600✔
968
        t = popfirst!(W)
5,979,355✔
969
        if t._state !== task_state_runnable
5,979,355✔
970
            # assume this somehow got queued twice,
971
            # probably broken now, but try discarding this switch and keep going
972
            # can't throw here, because it's probably not the fault of the caller to wait
973
            # and don't want to use print() here, because that may try to incur a task switch
974
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
975
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
976
            continue
×
977
        end
978
        return t
5,979,354✔
979
    end
1✔
980
    return Partr.multiq_deletemin()
9,716,923✔
981
end
982

983
checktaskempty = Partr.multiq_check_empty
984

985
@noinline function poptask(W::StickyWorkqueue)
11,676,056✔
986
    task = trypoptask(W)
11,675,579✔
987
    if !(task isa Task)
11,677,639✔
988
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
1,177,353✔
989
    end
990
    set_next_task(task)
11,676,511✔
991
    nothing
11,672,193✔
992
end
993

994
function wait()
11,681,248✔
995
    GC.safepoint()
11,680,918✔
996
    W = workqueue_for(Threads.threadid())
11,680,630✔
997
    poptask(W)
11,676,685✔
998
    result = try_yieldto(ensure_rescheduled)
11,672,490✔
999
    process_events()
3,795,098✔
1000
    # return when we come out of the queue
1001
    return result
3,795,825✔
1002
end
1003

1004
if Sys.iswindows()
1005
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
1006
else
1007
    pause() = ccall(:pause, Cvoid, ())
×
1008
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc