• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37632

26 Sep 2023 06:44AM UTC coverage: 86.999% (-0.9%) from 87.914%
#37632

push

local

web-flow
inference: make `throw` block deoptimization concrete-eval friendly (#49235)

The deoptimization can sometimes destroy the effects analysis and
disable [semi-]concrete evaluation that is otherwise possible. This is
because the deoptimization was designed with the type domain
profitability in mind (#35982), and hasn't been adequately considering
the effects domain.

This commit makes the deoptimization aware of the effects domain more
and enables the `throw` block deoptimization only when the effects
already known to be ineligible for concrete-evaluation.

In our current effect system, `ALWAYS_FALSE`/`false` means that the
effect can not be refined to `ALWAYS_TRUE`/`true` anymore (unless given
user annotation later). Therefore we can enable the `throw` block
deoptimization without hindering the chance of concrete-evaluation when
any of the following conditions are met:
- `effects.consistent === ALWAYS_FALSE`
- `effects.effect_free === ALWAYS_FALSE`
- `effects.terminates === false`
- `effects.nonoverlayed === false`

Here are some numbers:

| Metric | master | this commit | #35982 reverted (set
`unoptimize_throw_blocks=false`) |

|-------------------------|-----------|-------------|--------------------------------------------|
| Base (seconds) | 15.579300 | 15.206645 | 15.296319 |
| Stdlibs (seconds) | 17.919013 | 17.667094 | 17.738128 |
| Total (seconds) | 33.499279 | 32.874737 | 33.035448 |
| Precompilation (seconds) | 49.967516 | 49.421121 | 49.999998 |
| First time `plot(rand(10,3))` [^1] | `2.476678 seconds (11.74 M
allocations)` | `2.430355 seconds (11.77 M allocations)` | `2.514874
seconds (11.64 M allocations)` |
| First time `solve(prob, QNDF())(5.0)` [^2] | `4.469492 seconds (15.32
M allocations)` | `4.499217 seconds (15.41 M allocations)` | `4.470772
seconds (15.38 M allocations)` |

[^1]: With disabling precompilation of Plots.jl.
[^2]: With disabling precompilation of OrdinaryDiffEq.

These numbers ma... (continued)

7 of 7 new or added lines in 1 file covered. (100.0%)

73407 of 84377 relevant lines covered (87.0%)

11275130.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.73
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
15,778,174✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
58✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

16
        # Process bt_raw so that it can be safely serialized
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
58✔
18
        CapturedException(ex, bt_lines)
58✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
108✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
3✔
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
3✔
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
35
"""
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
104✔
49
    CompositeException(exceptions) = new(exceptions)
13✔
50
end
51
length(c::CompositeException) = length(c.exceptions)
11✔
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
14✔
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
95✔
54
isempty(c::CompositeException) = isempty(c.exceptions)
11✔
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
7✔
56
eltype(::Type{CompositeException}) = Any
×
57

58
function showerror(io::IO, ex::CompositeException)
11✔
59
    if !isempty(ex)
11✔
60
        showerror(io, ex.exceptions[1])
10✔
61
        remaining = length(ex) - 1
10✔
62
        if remaining > 0
10✔
63
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
4✔
64
        end
65
    else
66
        print(io, "CompositeException()\n")
1✔
67
    end
68
end
69

70
"""
71
    TaskFailedException
72

73
This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails.
74
`TaskFailedException` wraps the failed task `t`.
75
"""
76
struct TaskFailedException <: Exception
77
    task::Task
1,440✔
78
end
79

80
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
35✔
81
    print(io, "TaskFailedException")
12✔
82
    if bt !== nothing && backtrace
12✔
83
        show_backtrace(io, bt)
1✔
84
    end
85
    println(io)
12✔
86
    printstyled(io, "\n    nested task error: ", color=error_color())
12✔
87
    show_task_exception(io, ex.task)
12✔
88
end
89

90
function show_task_exception(io::IO, t::Task; indent = true)
24✔
91
    stack = current_exceptions(t)
12✔
92
    b = IOBuffer()
12✔
93
    if isempty(stack)
12✔
94
        # exception stack buffer not available; probably a serialized task
95
        showerror(IOContext(b, io), t.result)
1✔
96
    else
97
        show_exception_stack(IOContext(b, io), stack)
11✔
98
    end
99
    str = String(take!(b))
12✔
100
    if indent
12✔
101
        str = replace(str, "\n" => "\n    ")
12✔
102
    end
103
    print(io, str)
12✔
104
end
105

106
function show(io::IO, t::Task)
×
107
    state = t.state
×
108
    state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "")
×
109
    print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
110
end
111

112
"""
113
    @task
114

115
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
116
creates a task, and does not run it.
117

118
# Examples
119
```jldoctest
120
julia> a1() = sum(i for i in 1:1000);
121

122
julia> b = @task a1();
123

124
julia> istaskstarted(b)
125
false
126

127
julia> schedule(b);
128

129
julia> yield();
130

131
julia> istaskdone(b)
132
true
133
```
134
"""
135
macro task(ex)
21✔
136
    thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__)
21✔
137
    :(Task($thunk))
21✔
138
end
139

140
"""
141
    current_task()
142

143
Get the currently running [`Task`](@ref).
144
"""
145
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
348,451,374✔
146

147
# task states
148

149
const task_state_runnable = UInt8(0)
150
const task_state_done     = UInt8(1)
151
const task_state_failed   = UInt8(2)
152

153
const _state_index = findfirst(==(:_state), fieldnames(Task))
154
@eval function load_state_acquire(t)
3✔
155
    # TODO: Replace this by proper atomic operations when available
156
    @GC.preserve t llvmcall($("""
30,485,485✔
157
        %ptr = inttoptr i$(Sys.WORD_SIZE) %0 to i8*
158
        %rv = load atomic i8, i8* %ptr acquire, align 8
159
        ret i8 %rv
160
        """), UInt8, Tuple{Ptr{UInt8}},
161
        Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index)))
162
end
163

164
@inline function getproperty(t::Task, field::Symbol)
523,605,072✔
165
    if field === :state
523,477,792✔
166
        # TODO: this field name should be deprecated in 2.0
167
        st = load_state_acquire(t)
11✔
168
        if st === task_state_runnable
11✔
169
            return :runnable
1✔
170
        elseif st === task_state_done
10✔
171
            return :done
6✔
172
        elseif st === task_state_failed
4✔
173
            return :failed
4✔
174
        else
175
            @assert false
×
176
        end
177
    elseif field === :backtrace
523,561,700✔
178
        # TODO: this field name should be deprecated in 2.0
179
        return current_exceptions(t)[end][2]
×
180
    elseif field === :exception
523,529,741✔
181
        # TODO: this field name should be deprecated in 2.0
182
        return t._isexception ? t.result : nothing
18✔
183
    else
184
        return getfield(t, field)
822,858,028✔
185
    end
186
end
187

188
"""
189
    istaskdone(t::Task) -> Bool
190

191
Determine whether a task has exited.
192

193
# Examples
194
```jldoctest
195
julia> a2() = sum(i for i in 1:1000);
196

197
julia> b = Task(a2);
198

199
julia> istaskdone(b)
200
false
201

202
julia> schedule(b);
203

204
julia> yield();
205

206
julia> istaskdone(b)
207
true
208
```
209
"""
210
istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable
13,872,439✔
211

212
"""
213
    istaskstarted(t::Task) -> Bool
214

215
Determine whether a task has started executing.
216

217
# Examples
218
```jldoctest
219
julia> a3() = sum(i for i in 1:1000);
220

221
julia> b = Task(a3);
222

223
julia> istaskstarted(b)
224
false
225
```
226
"""
227
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
4✔
228

229
"""
230
    istaskfailed(t::Task) -> Bool
231

232
Determine whether a task has exited because an exception was thrown.
233

234
# Examples
235
```jldoctest
236
julia> a4() = error("task failed");
237

238
julia> b = Task(a4);
239

240
julia> istaskfailed(b)
241
false
242

243
julia> schedule(b);
244

245
julia> yield();
246

247
julia> istaskfailed(b)
248
true
249
```
250

251
!!! compat "Julia 1.3"
252
    This function requires at least Julia 1.3.
253
"""
254
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)
16,617,864✔
255

256
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
4,898,486✔
257
function Threads.threadpool(t::Task)
18✔
258
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
8,707,968✔
259
    return Threads._tpid_to_sym(tpid)
17,412,230✔
260
end
261

262
task_result(t::Task) = t.result
8,943,864✔
263

264
task_local_storage() = get_task_tls(current_task())
68,027,419✔
265
function get_task_tls(t::Task)
×
266
    if t.storage === nothing
68,027,441✔
267
        t.storage = IdDict()
919✔
268
    end
269
    return (t.storage)::IdDict{Any,Any}
68,027,410✔
270
end
271

272
"""
273
    task_local_storage(key)
274

275
Look up the value of a key in the current task's task-local storage.
276
"""
277
task_local_storage(key) = task_local_storage()[key]
×
278

279
"""
280
    task_local_storage(key, value)
281

282
Assign a value to a key in the current task's task-local storage.
283
"""
284
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
285

286
"""
287
    task_local_storage(body, key, value)
288

289
Call the function `body` with a modified task-local storage, in which `value` is assigned to
290
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
291
for emulating dynamic scoping.
292
"""
293
function task_local_storage(body::Function, key, val)
×
294
    tls = task_local_storage()
×
295
    hadkey = haskey(tls, key)
×
296
    old = get(tls, key, nothing)
×
297
    tls[key] = val
×
298
    try
×
299
        return body()
×
300
    finally
301
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
302
    end
303
end
304

305
# just wait for a task to be done, no error propagation
306
function _wait(t::Task)
8,613,932✔
307
    if !istaskdone(t)
8,613,902✔
308
        donenotify = t.donenotify::ThreadSynchronizer
2,446,094✔
309
        lock(donenotify)
2,446,096✔
310
        try
2,446,188✔
311
            while !istaskdone(t)
4,891,245✔
312
                wait(donenotify)
2,445,400✔
313
            end
2,445,424✔
314
        finally
315
            unlock(donenotify)
4,891,100✔
316
        end
317
    end
318
    nothing
8,613,431✔
319
end
320

321
# have `waiter` wait for `t`
322
function _wait2(t::Task, waiter::Task)
120,800✔
323
    if !istaskdone(t)
120,800✔
324
        # since _wait2 is similar to schedule, we should observe the sticky
325
        # bit, even if we don't call `schedule` with early-return below
326
        if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
120,800✔
327
            # Issue #41324
328
            # t.sticky && tid == 0 is a task that needs to be co-scheduled with
329
            # the parent task. If the parent (current_task) is not sticky we must
330
            # set it to be sticky.
331
            # XXX: Ideally we would be able to unset this
332
            current_task().sticky = true
22✔
333
            tid = Threads.threadid()
22✔
334
            ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
22✔
335
        end
336
        donenotify = t.donenotify::ThreadSynchronizer
120,800✔
337
        lock(donenotify)
120,800✔
338
        if !istaskdone(t)
120,800✔
339
            push!(donenotify.waitq, waiter)
120,843✔
340
            unlock(donenotify)
241,599✔
341
            return nothing
120,800✔
342
        else
343
            unlock(donenotify)
×
344
        end
345
    end
346
    schedule(waiter)
×
347
    nothing
×
348
end
349

350
function wait(t::Task)
1,195,120✔
351
    t === current_task() && error("deadlock detected: cannot wait on current task")
2,255,198✔
352
    _wait(t)
2,255,204✔
353
    if istaskfailed(t)
2,254,741✔
354
        throw(TaskFailedException(t))
1,288✔
355
    end
356
    nothing
2,253,303✔
357
end
358

359
"""
360
    fetch(x::Any)
361

362
Return `x`.
363
"""
364
fetch(@nospecialize x) = x
×
365

366
"""
367
    fetch(t::Task)
368

369
Wait for a [`Task`](@ref) to finish, then return its result value.
370
If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task)
371
is thrown.
372
"""
373
function fetch(t::Task)
1,060,074✔
374
    wait(t)
1,060,038✔
375
    return task_result(t)
1,059,408✔
376
end
377

378

379
## lexically-scoped waiting for multiple items
380

381
struct ScheduledAfterSyncException <: Exception
382
    values::Vector{Any}
95✔
383
end
384

385
function showerror(io::IO, ex::ScheduledAfterSyncException)
×
386
    print(io, "ScheduledAfterSyncException: ")
×
387
    if isempty(ex.values)
×
388
        print(io, "(no values)")
×
389
        return
×
390
    end
391
    show(io, ex.values[1])
×
392
    if length(ex.values) == 1
×
393
        print(io, " is")
×
394
    elseif length(ex.values) == 2
×
395
        print(io, " and one more ")
×
396
        print(io, nameof(typeof(ex.values[2])))
×
397
        print(io, " are")
×
398
    else
399
        print(io, " and ", length(ex.values) - 1, " more objects are")
×
400
    end
401
    print(io, " registered after the end of a `@sync` block")
×
402
end
403

404
function sync_end(c::Channel{Any})
4,417✔
405
    local c_ex
×
406
    while isready(c)
5,151,643✔
407
        r = take!(c)
5,147,228✔
408
        if isa(r, Task)
5,147,228✔
409
            _wait(r)
5,147,049✔
410
            if istaskfailed(r)
5,147,047✔
411
                if !@isdefined(c_ex)
11✔
412
                    c_ex = CompositeException()
6✔
413
                end
414
                push!(c_ex, TaskFailedException(r))
11✔
415
            end
416
        else
417
            try
179✔
418
                wait(r)
179✔
419
            catch e
420
                if !@isdefined(c_ex)
3✔
421
                    c_ex = CompositeException()
2✔
422
                end
423
                push!(c_ex, e)
3✔
424
            end
425
        end
426
    end
5,147,226✔
427
    close(c)
4,416✔
428

429
    # Capture all waitable objects scheduled after the end of `@sync` and
430
    # include them in the exception. This way, the user can check what was
431
    # scheduled by examining at the exception object.
432
    if isready(c)
4,416✔
433
        local racy
×
434
        for r in c
95✔
435
            if !@isdefined(racy)
95✔
436
                racy = []
95✔
437
            end
438
            push!(racy, r)
95✔
439
        end
95✔
440
        if @isdefined(racy)
95✔
441
            if !@isdefined(c_ex)
95✔
442
                c_ex = CompositeException()
95✔
443
            end
444
            # Since this is a clear programming error, show this exception first:
445
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
95✔
446
        end
447
    end
448

449
    if @isdefined(c_ex)
4,416✔
450
        throw(c_ex)
103✔
451
    end
452
    nothing
4,313✔
453
end
454

455
const sync_varname = gensym(:sync)
456

457
"""
458
    @sync
459

460
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn),
461
`Distributed.@spawnat` and `Distributed.@distributed`
462
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
463
a [`CompositeException`](@ref).
464

465
# Examples
466
```julia-repl
467
julia> Threads.nthreads()
468
4
469

470
julia> @sync begin
471
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
472
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
473
       end;
474
Thread-id 3, task 1
475
Thread-id 1, task 2
476
```
477
"""
478
macro sync(block)
60✔
479
    var = esc(sync_varname)
60✔
480
    quote
60✔
481
        let $var = Channel(Inf)
4,385✔
482
            v = $(esc(block))
66,718✔
483
            sync_end($var)
4,385✔
484
            v
4,223✔
485
        end
486
    end
487
end
488

489
# schedule an expression to run asynchronously
490

491
"""
492
    @async
493

494
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
495

496
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
497
constructed underlying closure. This allows you to insert the _value_ of a variable,
498
isolating the asynchronous code from changes to the variable's value in the current task.
499

500
!!! warning
501
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
502
    parallelism is required** especially in publicly distributed libraries.  This is
503
    because a use of `@async` disables the migration of the *parent* task across worker
504
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
505
    `@async` in a library function can have a large impact on the performance of very
506
    different parts of user applications.
507

508
!!! compat "Julia 1.4"
509
    Interpolating values via `\$` is available as of Julia 1.4.
510
"""
511
macro async(expr)
282✔
512
    do_async_macro(expr, __source__)
282✔
513
end
514

515
# generate the code for @async, possibly wrapping the task in something before
516
# pushing it to the wait queue.
517
function do_async_macro(expr, linenums; wrap=identity)
560✔
518
    letargs = Base._lift_one_interp!(expr)
280✔
519

520
    thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums)
280✔
521
    var = esc(sync_varname)
280✔
522
    quote
280✔
523
        let $(letargs...)
109✔
524
            local task = Task($thunk)
206,668✔
525
            if $(Expr(:islocal, var))
205,905✔
526
                put!($var, $(wrap(:task)))
82,845✔
527
            end
528
            schedule(task)
206,668✔
529
            task
205,928✔
530
        end
531
    end
532
end
533

534
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
535
struct UnwrapTaskFailedException <: Exception
536
    task::Task
165✔
537
end
538

539
# common code for wait&fetch for UnwrapTaskFailedException
540
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
165✔
541
    try
165✔
542
        f(t.task)
165✔
543
    catch ex
544
        if ex isa TaskFailedException
3✔
545
            throw(ex.task.exception)
3✔
546
        else
547
            rethrow()
×
548
        end
549
    end
550
end
551

552
# the unwrapping for above task wrapper (gets triggered in sync_end())
553
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
165✔
554

555
# same for fetching the tasks, for convenience
556
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
557

558
# macro for running async code that doesn't throw wrapped exceptions
559
macro async_unwrap(expr)
3✔
560
    do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task)))
6✔
561
end
562

563
"""
564
    errormonitor(t::Task)
565

566
Print an error log to `stderr` if task `t` fails.
567

568
# Examples
569
```julia-repl
570
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
571
Unhandled Task ERROR: task failed
572
Stacktrace:
573
[...]
574
```
575
"""
576
function errormonitor(t::Task)
118,767✔
577
    t2 = Task() do
120,518✔
578
        if istaskfailed(t)
119,607✔
579
            local errs = stderr
×
580
            try # try to display the failure atomically
×
581
                errio = IOContext(PipeBuffer(), errs::IO)
×
582
                emphasize(errio, "Unhandled Task ")
×
583
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
584
                write(errs, errio)
×
585
            catch
586
                try # try to display the secondary error atomically
×
587
                    errio = IOContext(PipeBuffer(), errs::IO)
×
588
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
589
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
590
                    write(errs, errio)
×
591
                    flush(errs)
×
592
                    # and then the actual error, as best we can
593
                    Core.print(Core.stderr, "while handling: ")
×
594
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
595
                catch e
596
                    # give up
597
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
598
                            " while trying to print a failed Task notice; giving up\n")
599
                end
600
            end
601
        end
602
        nothing
119,607✔
603
    end
604
    t2.sticky = false
120,518✔
605
    _wait2(t, t2)
120,518✔
606
    return t
2✔
607
end
608

609
# Capture interpolated variables in $() and move them to let-block
610
function _lift_one_interp!(e)
124✔
611
    letargs = Any[]  # store the new gensymed arguments
399✔
612
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
398✔
613
    letargs
124✔
614
end
615
_lift_one_interp_helper(v, _, _) = v
9✔
616
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
1,753✔
617
    if expr.head === :$
1,753✔
618
        if in_quote_context  # This $ is simply interpolating out of the quote
129✔
619
            # Now, we're out of the quote, so any _further_ $ is ours.
620
            in_quote_context = false
×
621
        else
622
            newarg = gensym()
105✔
623
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
105✔
624
            return newarg  # Don't recurse into the lifted $() exprs
129✔
625
        end
626
    elseif expr.head === :quote
1,624✔
627
        in_quote_context = true   # Don't try to lift $ directly out of quotes
20✔
628
    elseif expr.head === :macrocall
1,604✔
629
        return expr  # Don't recur into macro calls, since some other macros use $
84✔
630
    end
631
    for (i,e) in enumerate(expr.args)
3,111✔
632
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
6,506✔
633
    end
6,323✔
634
    expr
1,564✔
635
end
636

637

638
# add a wait-able object to the sync pool
639
macro sync_add(expr)
640
    var = esc(sync_varname)
641
    quote
642
        local ref = $(esc(expr))
643
        put!($var, ref)
644
        ref
645
    end
646
end
647

648
# runtime system hook called when a task finishes
649
function task_done_hook(t::Task)
7,886,321✔
650
    # `finish_task` sets `sigatomic` before entering this function
651
    err = istaskfailed(t)
7,885,527✔
652
    result = task_result(t)
7,885,824✔
653
    handled = false
30✔
654

655
    donenotify = t.donenotify
7,885,485✔
656
    if isa(donenotify, ThreadSynchronizer)
7,885,460✔
657
        lock(donenotify)
7,885,286✔
658
        try
7,885,673✔
659
            if !isempty(donenotify.waitq)
7,885,208✔
660
                handled = true
2,565,152✔
661
                notify(donenotify)
2,565,149✔
662
            end
663
        finally
664
            unlock(donenotify)
15,767,385✔
665
        end
666
    end
667

668
    if err && !handled && Threads.threadid() == 1
7,884,617✔
669
        if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
10✔
670
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
671
            active_repl_backend.in_eval
672
            throwto(active_repl_backend.backend_task, result) # this terminates the task
×
673
        end
674
    end
675
    # Clear sigatomic before waiting
676
    sigatomic_end()
7,884,580✔
677
    try
7,884,392✔
678
        wait() # this will not return
7,884,135✔
679
    catch e
680
        # If an InterruptException happens while blocked in the event loop, try handing
681
        # the exception to the REPL task since the current task is done.
682
        # issue #19467
683
        if Threads.threadid() == 1 &&
×
684
            isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
685
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
686
            active_repl_backend.in_eval
687
            throwto(active_repl_backend.backend_task, e)
×
688
        else
689
            rethrow()
×
690
        end
691
    end
692
end
693

694

695
## scheduler and work queue
696

697
mutable struct IntrusiveLinkedListSynchronized{T}
698
    queue::IntrusiveLinkedList{T}
699
    lock::Threads.SpinLock
700
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
127✔
701
end
702
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
16,178,362✔
703
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
704
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5,865,343✔
705
    lock(W.lock)
5,865,343✔
706
    try
5,865,344✔
707
        push!(W.queue, t)
10,587,078✔
708
    finally
709
        unlock(W.lock)
11,730,591✔
710
    end
711
    return W
5,865,343✔
712
end
713
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
714
    lock(W.lock)
5✔
715
    try
5✔
716
        pushfirst!(W.queue, t)
10✔
717
    finally
718
        unlock(W.lock)
10✔
719
    end
720
    return W
5✔
721
end
722
function pop!(W::IntrusiveLinkedListSynchronized)
×
723
    lock(W.lock)
×
724
    try
×
725
        return pop!(W.queue)
×
726
    finally
727
        unlock(W.lock)
×
728
    end
729
end
730
function popfirst!(W::IntrusiveLinkedListSynchronized)
5,865,272✔
731
    lock(W.lock)
5,865,271✔
732
    try
5,865,273✔
733
        return popfirst!(W.queue)
10,586,990✔
734
    finally
735
        unlock(W.lock)
5,865,272✔
736
    end
737
end
738
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
739
    lock(W.lock)
5✔
740
    try
5✔
741
        list_deletefirst!(W.queue, t)
5✔
742
    finally
743
        unlock(W.lock)
10✔
744
    end
745
    return W
5✔
746
end
747

748
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
749
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
750
const Workqueues_lock = Threads.SpinLock()
751
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
752

753
function workqueue_for(tid::Int)
17,348,178✔
754
    qs = Workqueues
17,348,148✔
755
    if length(qs) >= tid && isassigned(qs, tid)
17,348,115✔
756
        return @inbounds qs[tid]
17,347,545✔
757
    end
758
    # slow path to allocate it
759
    @assert tid > 0
127✔
760
    l = Workqueues_lock
×
761
    @lock l begin
127✔
762
        qs = Workqueues
127✔
763
        if length(qs) < tid
127✔
764
            nt = Threads.maxthreadid()
93✔
765
            @assert tid <= nt
93✔
766
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
93✔
767
        end
768
        if !isassigned(qs, tid)
127✔
769
            @inbounds qs[tid] = StickyWorkqueue()
127✔
770
        end
771
        return @inbounds qs[tid]
127✔
772
    end
773
end
774

775
function enq_work(t::Task)
11,484,749✔
776
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
11,484,758✔
777

778
    # Sticky tasks go into their thread's work queue.
779
    if t.sticky
11,484,667✔
780
        tid = Threads.threadid(t)
2,777,062✔
781
        if tid == 0
2,777,062✔
782
            # The task is not yet stuck to a thread. Stick it to the current
783
            # thread and do the same to the parent task (the current task) so
784
            # that the tasks are correctly co-scheduled (issue #41324).
785
            # XXX: Ideally we would be able to unset this.
786
            if GC.in_finalizer()
273,805✔
787
                # The task was launched in a finalizer. There is no thread to sticky it
788
                # to, so just allow it to run anywhere as if it had been non-sticky.
789
                t.sticky = false
4✔
790
                @goto not_sticky
4✔
791
            else
792
                tid = Threads.threadid()
273,801✔
793
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
273,801✔
794
                current_task().sticky = true
273,801✔
795
            end
796
        end
797
        push!(workqueue_for(tid), t)
2,777,058✔
798
    else
799
        @label not_sticky
×
800
        tp = Threads.threadpool(t)
17,412,139✔
801
        if tp === :foreign || Threads.threadpoolsize(tp) == 1
26,120,543✔
802
            # There's only one thread in the task's assigned thread pool;
803
            # use its work queue.
804
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
6,176,567✔
805
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
3,088,287✔
806
            push!(workqueue_for(tid), t)
3,088,287✔
807
        else
808
            # Otherwise, put the task in the multiqueue.
809
            Partr.multiq_insert(t, t.priority)
5,620,139✔
810
            tid = 0
×
811
        end
812
    end
813
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
11,482,745✔
814
    return t
11,485,747✔
815
end
816

817
schedule(t::Task) = enq_work(t)
7,765,932✔
818

819
"""
820
    schedule(t::Task, [val]; error=false)
821

822
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
823
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
824

825
If a second argument `val` is provided, it will be passed to the task (via the return value of
826
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
827
the woken task.
828

829
!!! warning
830
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
831
    See [the API reference](@ref low-level-schedule-wait) for more information.
832

833
# Examples
834
```jldoctest
835
julia> a5() = sum(i for i in 1:1000);
836

837
julia> b = Task(a5);
838

839
julia> istaskstarted(b)
840
false
841

842
julia> schedule(b);
843

844
julia> yield();
845

846
julia> istaskstarted(b)
847
true
848

849
julia> istaskdone(b)
850
true
851
```
852
"""
853
function schedule(t::Task, @nospecialize(arg); error=false)
7,384,972✔
854
    # schedule a task to be (re)started with the given value or exception
855
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
3,692,700✔
856
    if error
3,692,573✔
857
        t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t)
2,202✔
858
        setfield!(t, :result, arg)
2,200✔
859
        setfield!(t, :_isexception, true)
2,200✔
860
    else
861
        t.queue === nothing || Base.error("schedule: Task not runnable")
3,690,386✔
862
        setfield!(t, :result, arg)
3,690,187✔
863
    end
864
    enq_work(t)
3,692,420✔
865
    return t
3,693,353✔
866
end
867

868
"""
869
    yield()
870

871
Switch to the scheduler to allow another scheduled task to run. A task that calls this
872
function is still runnable, and will be restarted immediately if there are no other runnable
873
tasks.
874
"""
875
function yield()
27,201✔
876
    ct = current_task()
27,201✔
877
    enq_work(ct)
27,201✔
878
    try
27,201✔
879
        wait()
27,200✔
880
    catch
881
        ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
4✔
882
        rethrow()
4✔
883
    end
884
end
885

886
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
11,480,902✔
887

888
"""
889
    yield(t::Task, arg = nothing)
890

891
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
892
immediately yields to `t` before calling the scheduler.
893
"""
894
function yield(t::Task, @nospecialize(x=nothing))
91✔
895
    (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
468✔
896
    t.result = x
234✔
897
    enq_work(current_task())
234✔
898
    set_next_task(t)
234✔
899
    return try_yieldto(ensure_rescheduled)
234✔
900
end
901

902
"""
903
    yieldto(t::Task, arg = nothing)
904

905
Switch to the given task. The first time a task is switched to, the task's function is
906
called with no arguments. On subsequent switches, `arg` is returned from the task's last
907
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
908
or scheduling in any way. Its use is discouraged.
909
"""
910
function yieldto(t::Task, @nospecialize(x=nothing))
9✔
911
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
912
    # state error instead.
913
    if t._state === task_state_done
10✔
914
        return x
×
915
    elseif t._state === task_state_failed
5✔
916
        throw(t.result)
×
917
    end
918
    t.result = x
5✔
919
    set_next_task(t)
5✔
920
    return try_yieldto(identity)
5✔
921
end
922

923
function try_yieldto(undo)
11,477,644✔
924
    try
11,477,619✔
925
        ccall(:jl_switch, Cvoid, ())
11,478,272✔
926
    catch
927
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
6✔
928
        rethrow()
6✔
929
    end
930
    ct = current_task()
3,596,813✔
931
    if ct._isexception
3,596,766✔
932
        exc = ct.result
2,201✔
933
        ct.result = nothing
2,201✔
934
        ct._isexception = false
2,201✔
935
        throw(exc)
2,201✔
936
    end
937
    result = ct.result
3,594,475✔
938
    ct.result = nothing
3,594,619✔
939
    return result
3,594,437✔
940
end
941

942
# yield to a task, throwing an exception in it
943
function throwto(t::Task, @nospecialize exc)
4✔
944
    t.result = exc
4✔
945
    t._isexception = true
4✔
946
    set_next_task(t)
4✔
947
    return try_yieldto(identity)
4✔
948
end
949

950
function ensure_rescheduled(othertask::Task)
5✔
951
    ct = current_task()
5✔
952
    W = workqueue_for(Threads.threadid())
5✔
953
    if ct !== othertask && othertask._state === task_state_runnable
5✔
954
        # we failed to yield to othertask
955
        # return it to the head of a queue to be retried later
956
        tid = Threads.threadid(othertask)
5✔
957
        Wother = tid == 0 ? W : workqueue_for(tid)
10✔
958
        pushfirst!(Wother, othertask)
5✔
959
    end
960
    # if the current task was queued,
961
    # also need to return it to the runnable state
962
    # before throwing an error
963
    list_deletefirst!(W, ct)
5✔
964
    nothing
5✔
965
end
966

967
function trypoptask(W::StickyWorkqueue)
16,179,473✔
968
    while !isempty(W)
16,178,495✔
969
        t = popfirst!(W)
5,865,272✔
970
        if t._state !== task_state_runnable
5,865,271✔
971
            # assume this somehow got queued twice,
972
            # probably broken now, but try discarding this switch and keep going
973
            # can't throw here, because it's probably not the fault of the caller to wait
974
            # and don't want to use print() here, because that may try to incur a task switch
975
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
976
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
977
            continue
×
978
        end
979
        return t
5,865,271✔
980
    end
1✔
981
    return Partr.multiq_deletemin()
10,316,691✔
982
end
983

984
checktaskempty = Partr.multiq_check_empty
985

986
@noinline function poptask(W::StickyWorkqueue)
11,480,892✔
987
    task = trypoptask(W)
11,480,697✔
988
    if !(task isa Task)
11,478,762✔
989
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
1,708,818✔
990
    end
991
    set_next_task(task)
11,481,064✔
992
    nothing
11,480,475✔
993
end
994

995
function wait()
11,483,284✔
996
    GC.safepoint()
11,482,479✔
997
    W = workqueue_for(Threads.threadid())
11,481,915✔
998
    poptask(W)
11,480,734✔
999
    result = try_yieldto(ensure_rescheduled)
11,479,999✔
1000
    process_events()
3,594,435✔
1001
    # return when we come out of the queue
1002
    return result
3,595,693✔
1003
end
1004

1005
if Sys.iswindows()
1006
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
1007
else
1008
    pause() = ccall(:pause, Cvoid, ())
×
1009
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc