• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37527

pending completion
#37527

push

local

web-flow
make `IRShow.method_name` inferrable (#49607)

18 of 18 new or added lines in 3 files covered. (100.0%)

68710 of 81829 relevant lines covered (83.97%)

33068903.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.61
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
1✔
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
7,764,481✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
50✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

5✔
16
        # Process bt_raw so that it can be safely serialized
1✔
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
50✔
18
        CapturedException(ex, bt_lines)
50✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
92✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
3✔
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
3✔
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
6✔
35
"""
6✔
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
50✔
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
116✔
49
    CompositeException(exceptions) = new(exceptions)
13✔
50
end
51
length(c::CompositeException) = length(c.exceptions)
11✔
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
14✔
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
107✔
54
isempty(c::CompositeException) = isempty(c.exceptions)
11✔
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
16✔
56
eltype(::Type{CompositeException}) = Any
9✔
57

58
function showerror(io::IO, ex::CompositeException)
11✔
59
    if !isempty(ex)
11✔
60
        showerror(io, ex.exceptions[1])
10✔
61
        remaining = length(ex) - 1
10✔
62
        if remaining > 0
32✔
63
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
4✔
64
        end
65
    else
66
        print(io, "CompositeException()\n")
51✔
67
    end
68
end
22✔
69

70
"""
71
    TaskFailedException
72

73
This exception is thrown by a `wait(t)` call when task `t` fails.
74
`TaskFailedException` wraps the failed task `t`.
75
"""
76
struct TaskFailedException <: Exception
77
    task::Task
1,402✔
78
end
79

80
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
35✔
81
    print(io, "TaskFailedException")
12✔
82
    if bt !== nothing && backtrace
12✔
83
        show_backtrace(io, bt)
1✔
84
    end
85
    println(io)
12✔
86
    printstyled(io, "\n    nested task error: ", color=error_color())
12✔
87
    show_task_exception(io, ex.task)
12✔
88
end
89

90
function show_task_exception(io::IO, t::Task; indent = true)
24✔
91
    stack = current_exceptions(t)
12✔
92
    b = IOBuffer()
12✔
93
    if isempty(stack)
12✔
94
        # exception stack buffer not available; probably a serialized task
95
        showerror(IOContext(b, io), t.result)
1✔
96
    else
97
        show_exception_stack(IOContext(b, io), stack)
11✔
98
    end
99
    str = String(take!(b))
12✔
100
    if indent
12✔
101
        str = replace(str, "\n" => "\n    ")
12✔
102
    end
103
    print(io, str)
12✔
104
end
105

106
function show(io::IO, t::Task)
×
107
    print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
108
end
109

110
"""
111
    @task
112

113
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
114
creates a task, and does not run it.
115

116
# Examples
117
```jldoctest
118
julia> a1() = sum(i for i in 1:1000);
80,000✔
119

90,000✔
120
julia> b = @task a1();
121

122
julia> istaskstarted(b)
123
false
124

125
julia> schedule(b);
126

127
julia> yield();
128

129
julia> istaskdone(b)
130
true
131
```
132
"""
133
macro task(ex)
21✔
134
    :(Task(()->$(esc(ex))))
603✔
135
end
136

137
"""
138
    current_task()
139

140
Get the currently running [`Task`](@ref).
141
"""
142
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
156,990,375✔
143

144
# task states
145

146
const task_state_runnable = UInt8(0)
147
const task_state_done     = UInt8(1)
148
const task_state_failed   = UInt8(2)
149

150
const _state_index = findfirst(==(:_state), fieldnames(Task))
151
@eval function load_state_acquire(t)
3✔
152
    # TODO: Replace this by proper atomic operations when available
153
    @GC.preserve t llvmcall($("""
18,453,190✔
154
        %ptr = inttoptr i$(Sys.WORD_SIZE) %0 to i8*
155
        %rv = load atomic i8, i8* %ptr acquire, align 8
156
        ret i8 %rv
157
        """), UInt8, Tuple{Ptr{UInt8}},
158
        Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index)))
159
end
160

161
@inline function getproperty(t::Task, field::Symbol)
30,389,104✔
162
    if field === :state
30,389,104✔
163
        # TODO: this field name should be deprecated in 2.0
4✔
164
        st = load_state_acquire(t)
11✔
165
        if st === task_state_runnable
11✔
166
            return :runnable
1✔
167
        elseif st === task_state_done
10✔
168
            return :done
6✔
169
        elseif st === task_state_failed
8✔
170
            return :failed
4✔
171
        else
172
            @assert false
×
173
        end
174
    elseif field === :backtrace
30,389,093✔
175
        # TODO: this field name should be deprecated in 2.0
176
        return current_exceptions(t)[end][2]
×
177
    elseif field === :exception
30,389,093✔
178
        # TODO: this field name should be deprecated in 2.0
179
        return t._isexception ? t.result : nothing
18✔
180
    else
181
        return getfield(t, field)
521,447,545✔
182
    end
183
end
184

185
"""
186
    istaskdone(t::Task) -> Bool
187

188
Determine whether a task has exited.
189

190
# Examples
191
```jldoctest
192
julia> a2() = sum(i for i in 1:1000);
193

194
julia> b = Task(a2);
3✔
195

196
julia> istaskdone(b)
197
false
198

199
julia> schedule(b);
200

201
julia> yield();
202

203
julia> istaskdone(b)
204
true
205
```
206
"""
207
istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable
9,852,273✔
208

209
"""
210
    istaskstarted(t::Task) -> Bool
211

212
Determine whether a task has started executing.
213

214
# Examples
215
```jldoctest
216
julia> a3() = sum(i for i in 1:1000);
217

218
julia> b = Task(a3);
219

220
julia> istaskstarted(b)
221
false
222
```
223
"""
224
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
4✔
225

226
"""
227
    istaskfailed(t::Task) -> Bool
228

229
Determine whether a task has exited because an exception was thrown.
230

231
# Examples
232
```jldoctest
233
julia> a4() = error("task failed");
234

235
julia> b = Task(a4);
236

237
julia> istaskfailed(b)
238
false
239

240
julia> schedule(b);
241

242
julia> yield();
243

244
julia> istaskfailed(b)
245
true
246
```
247

248
!!! compat "Julia 1.3"
249
    This function requires at least Julia 1.3.
250
"""
251
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)
8,607,318✔
252

253
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
5,276,299✔
254
function Threads.threadpool(t::Task)
×
255
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
4,659,131✔
256
    return Threads._tpid_to_sym(tpid)
9,315,347✔
257
end
258

259
task_result(t::Task) = t.result
4,936,782✔
260

261
task_local_storage() = get_task_tls(current_task())
38,563,343✔
262
function get_task_tls(t::Task)
×
263
    if t.storage === nothing
38,563,425✔
264
        t.storage = IdDict()
675✔
265
    end
266
    return (t.storage)::IdDict{Any,Any}
38,563,385✔
267
end
268

269
"""
270
    task_local_storage(key)
271

272
Look up the value of a key in the current task's task-local storage.
273
"""
274
task_local_storage(key) = task_local_storage()[key]
1,473✔
275

276
"""
277
    task_local_storage(key, value)
278

279
Assign a value to a key in the current task's task-local storage.
280
"""
281
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
282

283
"""
284
    task_local_storage(body, key, value)
285

286
Call the function `body` with a modified task-local storage, in which `value` is assigned to
287
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
288
for emulating dynamic scoping.
289
"""
290
function task_local_storage(body::Function, key, val)
×
291
    tls = task_local_storage()
×
292
    hadkey = haskey(tls, key)
×
293
    old = get(tls, key, nothing)
×
294
    tls[key] = val
×
295
    try
×
296
        return body()
×
297
    finally
298
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
299
    end
300
end
301

302
# just wait for a task to be done, no error propagation
303
function _wait(t::Task)
4,612,943✔
304
    if !istaskdone(t)
4,612,974✔
305
        lock(t.donenotify)
2,445,644✔
306
        try
2,445,742✔
307
            while !istaskdone(t)
4,889,619✔
308
                wait(t.donenotify)
2,444,821✔
309
            end
2,444,665✔
310
        finally
311
            unlock(t.donenotify)
2,444,828✔
312
        end
313
    end
314
    nothing
4,612,147✔
315
end
316

317
# have `waiter` wait for `t`
318
function _wait2(t::Task, waiter::Task)
118,700✔
319
    if !istaskdone(t)
118,700✔
320
        lock(t.donenotify)
118,700✔
321
        if !istaskdone(t)
118,700✔
322
            push!(t.donenotify.waitq, waiter)
118,700✔
323
            unlock(t.donenotify)
118,700✔
324
            # since _wait2 is similar to schedule, we should observe the sticky
325
            # bit, even if we aren't calling `schedule` due to this early-return
326
            if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
118,700✔
327
                # Issue #41324
328
                # t.sticky && tid == 0 is a task that needs to be co-scheduled with
329
                # the parent task. If the parent (current_task) is not sticky we must
330
                # set it to be sticky.
331
                # XXX: Ideally we would be able to unset this
332
                current_task().sticky = true
267✔
333
                tid = Threads.threadid()
267✔
334
                ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
267✔
335
            end
336
            return nothing
118,700✔
337
        else
338
            unlock(t.donenotify)
×
339
        end
340
    end
341
    schedule(waiter)
×
342
    nothing
×
343
end
344

345
function wait(t::Task)
1,193,289✔
346
    t === current_task() && error("deadlock detected: cannot wait on current task")
2,254,157✔
347
    _wait(t)
2,254,141✔
348
    if istaskfailed(t)
2,253,502✔
349
        throw(TaskFailedException(t))
1,250✔
350
    end
351
    nothing
2,252,309✔
352
end
353

354
"""
355
    fetch(x::Any)
356

357
Return `x`.
358
"""
359
fetch(@nospecialize x) = x
×
360

361
"""
362
    fetch(t::Task)
363

364
Wait for a Task to finish, then return its result value.
365
If the task fails with an exception, a `TaskFailedException` (which wraps the failed task)
366
is thrown.
367
"""
368
function fetch(t::Task)
1,059,990✔
369
    wait(t)
1,060,030✔
370
    return task_result(t)
1,059,430✔
371
end
372

373

374
## lexically-scoped waiting for multiple items
375

376
struct ScheduledAfterSyncException <: Exception
377
    values::Vector{Any}
107✔
378
end
379

380
function showerror(io::IO, ex::ScheduledAfterSyncException)
×
381
    print(io, "ScheduledAfterSyncException: ")
×
382
    if isempty(ex.values)
×
383
        print(io, "(no values)")
×
384
        return
×
385
    end
386
    show(io, ex.values[1])
×
387
    if length(ex.values) == 1
×
388
        print(io, " is")
×
389
    elseif length(ex.values) == 2
×
390
        print(io, " and one more ")
×
391
        print(io, nameof(typeof(ex.values[2])))
×
392
        print(io, " are")
×
393
    else
394
        print(io, " and ", length(ex.values) - 1, " more objects are")
×
395
    end
396
    print(io, " registered after the end of a `@sync` block")
×
397
end
398

399
function sync_end(c::Channel{Any})
4,408✔
400
    local c_ex
×
401
    while isready(c)
1,151,657✔
402
        r = take!(c)
1,147,249✔
403
        if isa(r, Task)
1,147,249✔
404
            _wait(r)
1,147,070✔
405
            if istaskfailed(r)
1,147,070✔
406
                if !@isdefined(c_ex)
11✔
407
                    c_ex = CompositeException()
6✔
408
                end
409
                push!(c_ex, TaskFailedException(r))
11✔
410
            end
411
        else
412
            try
179✔
413
                wait(r)
179✔
414
            catch e
415
                if !@isdefined(c_ex)
3✔
416
                    c_ex = CompositeException()
2✔
417
                end
418
                push!(c_ex, e)
3✔
419
            end
420
        end
421
    end
1,147,249✔
422
    close(c)
4,408✔
423

424
    # Capture all waitable objects scheduled after the end of `@sync` and
425
    # include them in the exception. This way, the user can check what was
426
    # scheduled by examining at the exception object.
427
    if isready(c)
4,408✔
428
        local racy
×
429
        for r in c
107✔
430
            if !@isdefined(racy)
107✔
431
                racy = []
107✔
432
            end
433
            push!(racy, r)
107✔
434
        end
107✔
435
        if @isdefined(racy)
107✔
436
            if !@isdefined(c_ex)
107✔
437
                c_ex = CompositeException()
107✔
438
            end
439
            # Since this is a clear programming error, show this exception first:
440
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
107✔
441
        end
442
    end
443

444
    if @isdefined(c_ex)
4,408✔
445
        throw(c_ex)
115✔
446
    end
447
    nothing
4,293✔
448
end
449

450
const sync_varname = gensym(:sync)
451

452
"""
453
    @sync
454

455
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn), `@spawnat` and `@distributed`
456
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
457
a [`CompositeException`](@ref).
458

459
# Examples
460
```julia-repl
461
julia> Threads.nthreads()
462
4
463

464
julia> @sync begin
465
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
466
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
467
       end;
468
Thread-id 3, task 1
469
Thread-id 1, task 2
470
```
471
"""
472
macro sync(block)
82✔
473
    var = esc(sync_varname)
82✔
474
    quote
82✔
475
        let $var = Channel(Inf)
4,379✔
476
            v = $(esc(block))
66,707✔
477
            sync_end($var)
4,379✔
478
            v
4,203✔
479
        end
480
    end
481
end
482

483
# schedule an expression to run asynchronously
484

485
"""
486
    @async
487

488
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
489

490
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
491
constructed underlying closure. This allows you to insert the _value_ of a variable,
492
isolating the asynchronous code from changes to the variable's value in the current task.
493

494
!!! warning
56✔
495
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
496
    parallelism is required** especially in publicly distributed libraries.  This is
497
    because a use of `@async` disables the migration of the *parent* task across worker
498
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
499
    `@async` in a library function can have a large impact on the performance of very
500
    different parts of user applications.
501

502
!!! compat "Julia 1.4"
503
    Interpolating values via `\$` is available as of Julia 1.4.
504
"""
505
macro async(expr)
282✔
506
    do_async_macro(expr)
282✔
507
end
508

509
# generate the code for @async, possibly wrapping the task in something before
510
# pushing it to the wait queue.
511
function do_async_macro(expr; wrap=identity)
518✔
512
    letargs = Base._lift_one_interp!(expr)
259✔
513

514
    thunk = esc(:(()->($expr)))
271,563✔
515
    var = esc(sync_varname)
259✔
516
    quote
259✔
517
        let $(letargs...)
108✔
518
            local task = Task($thunk)
205,547✔
519
            if $(Expr(:islocal, var))
205,374✔
520
                put!($var, $(wrap(:task)))
82,337✔
521
            end
522
            schedule(task)
205,547✔
523
            task
205,312✔
524
        end
525
    end
526
end
527

528
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
529
struct UnwrapTaskFailedException <: Exception
530
    task::Task
165✔
531
end
532

533
# common code for wait&fetch for UnwrapTaskFailedException
534
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
165✔
535
    try
165✔
536
        f(t.task)
165✔
537
    catch ex
538
        if ex isa TaskFailedException
3✔
539
            throw(ex.task.exception)
3✔
540
        else
541
            rethrow()
×
542
        end
543
    end
544
end
545

546
# the unwrapping for above task wrapper (gets triggered in sync_end())
547
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
165✔
548

549
# same for fetching the tasks, for convenience
550
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
551

552
# macro for running async code that doesn't throw wrapped exceptions
553
macro async_unwrap(expr)
9✔
554
    do_async_macro(expr, wrap=task->:(Base.UnwrapTaskFailedException($task)))
18✔
555
end
556

557
"""
558
    errormonitor(t::Task)
559

560
Print an error log to `stderr` if task `t` fails.
561

562
# Examples
563
```julia-repl
564
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
565
Unhandled Task ERROR: task failed
566
Stacktrace:
567
[...]
568
```
569
"""
131,070✔
570
function errormonitor(t::Task)
117,217✔
571
    t2 = Task() do
118,470✔
572
        if istaskfailed(t)
117,576✔
573
            local errs = stderr
×
574
            try # try to display the failure atomically
×
575
                errio = IOContext(PipeBuffer(), errs::IO)
×
576
                emphasize(errio, "Unhandled Task ")
×
577
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
578
                write(errs, errio)
×
579
            catch
580
                try # try to display the secondary error atomically
×
581
                    errio = IOContext(PipeBuffer(), errs::IO)
×
582
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
583
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
584
                    write(errs, errio)
×
585
                    flush(errs)
×
586
                    # and then the actual error, as best we can
587
                    Core.print(Core.stderr, "while handling: ")
×
588
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
589
                catch e
590
                    # give up
591
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
592
                            " while trying to print a failed Task notice; giving up\n")
593
                end
594
            end
595
        end
596
        nothing
117,576✔
597
    end
598
    t2.sticky = false
118,470✔
599
    _wait2(t, t2)
118,470✔
600
    return t
1✔
601
end
602

603
# Capture interpolated variables in $() and move them to let-block
604
function _lift_one_interp!(e)
111✔
605
    letargs = Any[]  # store the new gensymed arguments
365✔
606
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
362✔
607
    letargs
111✔
608
end
609
_lift_one_interp_helper(v, _, _) = v
9✔
610
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
1,708✔
611
    if expr.head === :$
1,708✔
612
        if in_quote_context  # This $ is simply interpolating out of the quote
134✔
613
            # Now, we're out of the quote, so any _further_ $ is ours.
614
            in_quote_context = false
×
615
        else
616
            newarg = gensym()
110✔
617
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
110✔
618
            return newarg  # Don't recurse into the lifted $() exprs
134✔
619
        end
620
    elseif expr.head === :quote
1,574✔
621
        in_quote_context = true   # Don't try to lift $ directly out of quotes
20✔
622
    elseif expr.head === :macrocall
1,554✔
623
        return expr  # Don't recur into macro calls, since some other macros use $
83✔
624
    end
625
    for (i,e) in enumerate(expr.args)
3,016✔
626
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
6,446✔
627
    end
6,303✔
628
    expr
1,515✔
629
end
630

631

632
# add a wait-able object to the sync pool
633
macro sync_add(expr)
634
    var = esc(sync_varname)
635
    quote
636
        local ref = $(esc(expr))
637
        put!($var, ref)
638
        ref
639
    end
640
end
641

642
# runtime system hook called when a task finishes
643
function task_done_hook(t::Task)
3,879,603✔
644
    # `finish_task` sets `sigatomic` before entering this function
645
    err = istaskfailed(t)
3,878,668✔
646
    result = task_result(t)
3,879,242✔
647
    handled = false
×
648

649
    donenotify = t.donenotify
3,879,057✔
650
    if isa(donenotify, ThreadSynchronizer)
3,879,057✔
651
        lock(donenotify)
3,878,932✔
652
        try
3,879,063✔
653
            if !isempty(donenotify.waitq)
3,878,496✔
654
                handled = true
2,562,352✔
655
                notify(donenotify)
2,562,361✔
656
            end
657
        finally
658
            unlock(donenotify)
7,752,469✔
659
        end
660
    end
661

662
    if err && !handled && Threads.threadid() == 1
3,877,579✔
663
        if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
9✔
664
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
665
            active_repl_backend.in_eval
666
            throwto(active_repl_backend.backend_task, result) # this terminates the task
×
667
        end
668
    end
669
    # Clear sigatomic before waiting
670
    sigatomic_end()
3,877,203✔
671
    try
3,876,632✔
672
        wait() # this will not return
3,875,252✔
673
    catch e
674
        # If an InterruptException happens while blocked in the event loop, try handing
675
        # the exception to the REPL task since the current task is done.
676
        # issue #19467
677
        if Threads.threadid() == 1 &&
×
678
            isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
679
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
680
            active_repl_backend.in_eval
681
            throwto(active_repl_backend.backend_task, e)
×
682
        else
683
            rethrow()
×
684
        end
685
    end
686
end
687

688

128✔
689
## scheduler and work queue
690

691
struct IntrusiveLinkedListSynchronized{T}
692
    queue::IntrusiveLinkedList{T}
693
    lock::Threads.SpinLock
694
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
59✔
695
end
696
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
12,545,697✔
697
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
698
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5,046,488✔
699
    lock(W.lock)
5,046,488✔
700
    try
5,046,488✔
701
        push!(W.queue, t)
8,663,085✔
702
    finally
703
        unlock(W.lock)
10,092,879✔
704
    end
705
    return W
5,046,487✔
706
end
707
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
6✔
708
    lock(W.lock)
6✔
709
    try
6✔
710
        pushfirst!(W.queue, t)
12✔
711
    finally
712
        unlock(W.lock)
12✔
713
    end
714
    return W
6✔
715
end
716
function pop!(W::IntrusiveLinkedListSynchronized)
×
717
    lock(W.lock)
×
718
    try
×
719
        return pop!(W.queue)
×
720
    finally
721
        unlock(W.lock)
×
722
    end
723
end
724
function popfirst!(W::IntrusiveLinkedListSynchronized)
5,046,360✔
725
    lock(W.lock)
5,046,360✔
726
    try
5,046,360✔
727
        return popfirst!(W.queue)
8,662,936✔
728
    finally
729
        unlock(W.lock)
5,046,359✔
730
    end
731
end
732
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
6✔
733
    lock(W.lock)
6✔
734
    try
6✔
735
        list_deletefirst!(W.queue, t)
6✔
736
    finally
737
        unlock(W.lock)
12✔
738
    end
739
    return W
6✔
740
end
741

742
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
743
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
744
const Workqueues_lock = Threads.SpinLock()
745
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
746

747
function workqueue_for(tid::Int)
12,665,140✔
748
    qs = Workqueues
12,665,124✔
749
    if length(qs) >= tid && isassigned(qs, tid)
12,665,035✔
750
        return @inbounds qs[tid]
12,665,962✔
751
    end
752
    # slow path to allocate it
753
    l = Workqueues_lock
×
754
    @lock l begin
59✔
755
        qs = Workqueues
59✔
756
        if length(qs) < tid
59✔
757
            nt = Threads.maxthreadid()
16✔
758
            @assert tid <= nt
16✔
759
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
16✔
760
        end
761
        if !isassigned(qs, tid)
76✔
762
            @inbounds qs[tid] = StickyWorkqueue()
59✔
763
        end
764
        return @inbounds qs[tid]
42✔
765
    end
766
end
767

768
function enq_work(t::Task)
7,621,162✔
769
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
7,621,507✔
770

771
    # Sticky tasks go into their thread's work queue.
772
    if t.sticky
7,621,277✔
773
        tid = Threads.threadid(t)
2,962,226✔
774
        if tid == 0 && !GC.in_finalizer()
2,962,227✔
775
            # The task is not yet stuck to a thread. Stick it to the current
776
            # thread and do the same to the parent task (the current task) so
777
            # that the tasks are correctly co-scheduled (issue #41324).
778
            # XXX: Ideally we would be able to unset this.
779
            tid = Threads.threadid()
272,113✔
780
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
272,113✔
781
            current_task().sticky = true
272,112✔
782
        end
783
        push!(workqueue_for(tid), t)
2,962,227✔
784
    else
785
        tp = Threads.threadpool(t)
9,314,911✔
786
        if Threads.threadpoolsize(tp) == 1
9,316,244✔
787
            # There's only one thread in the task's assigned thread pool;
788
            # use its work queue.
789
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
4,168,555✔
790
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
2,084,278✔
791
            push!(workqueue_for(tid), t)
2,084,278✔
792
        else
793
            # Otherwise, put the task in the multiqueue.
794
            Partr.multiq_insert(t, t.priority)
2,575,177✔
795
            tid = 0
×
796
        end
797
    end
798
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
7,618,668✔
799
    return t
7,620,835✔
800
end
801

802
schedule(t::Task) = enq_work(t)
3,762,357✔
803

804
"""
805
    schedule(t::Task, [val]; error=false)
806

807
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
808
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
809

810
If a second argument `val` is provided, it will be passed to the task (via the return value of
811
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
812
the woken task.
813

814
!!! warning
815
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
816
    See [the API reference](@ref low-level-schedule-wait) for more information.
817

818
# Examples
819
```jldoctest
820
julia> a5() = sum(i for i in 1:1000);
821

822
julia> b = Task(a5);
823

824
julia> istaskstarted(b)
825
false
826

827
julia> schedule(b);
828

829
julia> yield();
830

831
julia> istaskstarted(b)
832
true
833

834
julia> istaskdone(b)
835
true
836
```
837
"""
838
function schedule(t::Task, @nospecialize(arg); error=false)
7,665,444✔
839
    # schedule a task to be (re)started with the given value or exception
840
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
3,833,112✔
841
    if error
3,832,916✔
842
        t.queue === nothing || Base.list_deletefirst!(t.queue, t)
2,215✔
843
        setfield!(t, :result, arg)
2,213✔
844
        setfield!(t, :_isexception, true)
2,213✔
845
    else
846
        t.queue === nothing || Base.error("schedule: Task not runnable")
3,830,760✔
847
        setfield!(t, :result, arg)
3,830,580✔
848
    end
849
    enq_work(t)
3,832,817✔
850
    return t
3,833,358✔
851
end
852

853
"""
854
    yield()
855

856
Switch to the scheduler to allow another scheduled task to run. A task that calls this
857
function is still runnable, and will be restarted immediately if there are no other runnable
858
tasks.
859
"""
860
function yield()
26,707✔
861
    ct = current_task()
26,707✔
862
    enq_work(ct)
26,707✔
863
    try
26,707✔
864
        wait()
26,707✔
865
    catch
866
        ct.queue === nothing || list_deletefirst!(ct.queue, ct)
5✔
867
        rethrow()
5✔
868
    end
869
end
870

871
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
7,615,546✔
872

873
"""
874
    yield(t::Task, arg = nothing)
875

876
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
877
immediately yields to `t` before calling the scheduler.
878
"""
879
function yield(t::Task, @nospecialize(x=nothing))
86✔
880
    (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
448✔
881
    t.result = x
224✔
882
    enq_work(current_task())
224✔
883
    set_next_task(t)
224✔
884
    return try_yieldto(ensure_rescheduled)
224✔
885
end
886

887
"""
888
    yieldto(t::Task, arg = nothing)
889

890
Switch to the given task. The first time a task is switched to, the task's function is
891
called with no arguments. On subsequent switches, `arg` is returned from the task's last
892
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
893
or scheduling in any way. Its use is discouraged.
894
"""
895
function yieldto(t::Task, @nospecialize(x=nothing))
9✔
896
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
897
    # state error instead.
898
    if t._state === task_state_done
10✔
899
        return x
×
900
    elseif t._state === task_state_failed
5✔
901
        throw(t.result)
×
902
    end
903
    t.result = x
5✔
904
    set_next_task(t)
5✔
905
    return try_yieldto(identity)
5✔
906
end
907

908
function try_yieldto(undo)
7,610,881✔
909
    try
7,610,845✔
910
        ccall(:jl_switch, Cvoid, ())
7,608,690✔
911
    catch
912
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
7✔
913
        rethrow()
7✔
914
    end
915
    ct = current_task()
3,739,504✔
916
    if ct._isexception
3,739,931✔
917
        exc = ct.result
2,214✔
918
        ct.result = nothing
2,214✔
919
        ct._isexception = false
2,214✔
920
        throw(exc)
2,214✔
921
    end
922
    result = ct.result
3,737,688✔
923
    ct.result = nothing
3,737,860✔
924
    return result
3,737,903✔
925
end
926

927
# yield to a task, throwing an exception in it
928
function throwto(t::Task, @nospecialize exc)
4✔
929
    t.result = exc
4✔
930
    t._isexception = true
4✔
931
    set_next_task(t)
4✔
932
    return try_yieldto(identity)
4✔
933
end
934

935
function ensure_rescheduled(othertask::Task)
6✔
936
    ct = current_task()
6✔
937
    W = workqueue_for(Threads.threadid())
6✔
938
    if ct !== othertask && othertask._state === task_state_runnable
6✔
939
        # we failed to yield to othertask
940
        # return it to the head of a queue to be retried later
941
        tid = Threads.threadid(othertask)
6✔
942
        Wother = tid == 0 ? W : workqueue_for(tid)
12✔
943
        pushfirst!(Wother, othertask)
6✔
944
    end
945
    # if the current task was queued,
946
    # also need to return it to the runnable state
947
    # before throwing an error
948
    list_deletefirst!(W, ct)
6✔
949
    nothing
6✔
950
end
951

952
function trypoptask(W::StickyWorkqueue)
12,546,746✔
953
    while !isempty(W)
12,544,848✔
954
        t = popfirst!(W)
5,046,360✔
955
        if t._state !== task_state_runnable
5,046,360✔
956
            # assume this somehow got queued twice,
957
            # probably broken now, but try discarding this switch and keep going
958
            # can't throw here, because it's probably not the fault of the caller to wait
959
            # and don't want to use print() here, because that may try to incur a task switch
960
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
961
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
962
            continue
×
963
        end
964
        return t
5,046,358✔
965
    end
1✔
966
    return Partr.multiq_deletemin()
7,505,774✔
967
end
968

969
checktaskempty = Partr.multiq_check_empty
970

971
@noinline function poptask(W::StickyWorkqueue)
7,616,504✔
972
    task = trypoptask(W)
7,616,367✔
973
    if !(task isa Task)
7,614,744✔
974
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
1,404,188✔
975
    end
976
    set_next_task(task)
7,615,785✔
977
    nothing
7,615,078✔
978
end
979

980
function wait()
7,618,610✔
981
    GC.safepoint()
7,618,075✔
982
    W = workqueue_for(Threads.threadid())
7,617,622✔
983
    poptask(W)
7,616,906✔
984
    result = try_yieldto(ensure_rescheduled)
7,614,775✔
985
    process_events()
3,737,981✔
986
    # return when we come out of the queue
987
    return result
3,738,820✔
988
end
989

990
if Sys.iswindows()
991
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
992
else
993
    pause() = ccall(:pause, Cvoid, ())
×
994
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc