• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #38002

06 Feb 2025 06:14AM UTC coverage: 20.322% (-2.4%) from 22.722%
#38002

push

local

web-flow
bpart: Fully switch to partitioned semantics (#57253)

This is the final PR in the binding partitions series (modulo bugs and
tweaks), i.e. it closes #54654 and thus closes #40399, which was the
original design sketch.

This thus activates the full designed semantics for binding partitions,
in particular allowing safe replacement of const bindings. It in
particular allows struct redefinitions. This thus closes
timholy/Revise.jl#18 and also closes #38584.

The biggest semantic change here is probably that this gets rid of the
notion of "resolvedness" of a binding. Previously, a lot of the behavior
of our implementation depended on when bindings were "resolved", which
could happen at basically an arbitrary point (in the compiler, in REPL
completion, in a different thread), making a lot of the semantics around
bindings ill- or at least implementation-defined. There are several
related issues in the bugtracker, so this closes #14055 closes #44604
closes #46354 closes #30277

It is also the last step to close #24569.
It also supports bindings for undef->defined transitions and thus closes
#53958 closes #54733 - however, this is not activated yet for
performance reasons and may need some further optimization.

Since resolvedness no longer exists, we need to replace it with some
hopefully more well-defined semantics. I will describe the semantics
below, but before I do I will make two notes:

1. There are a number of cases where these semantics will behave
slightly differently than the old semantics absent some other task going
around resolving random bindings.
2. The new behavior (except for the replacement stuff) was generally
permissible under the old semantics if the bindings happened to be
resolved at the right time.

With all that said, there are essentially three "strengths" of bindings:

1. Implicit Bindings: Anything implicitly obtained from `using Mod`, "no
binding", plus slightly more exotic corner cases around conflicts

2. Weakly declared bindin... (continued)

11 of 111 new or added lines in 7 files covered. (9.91%)

1273 existing lines in 68 files now uncovered.

9908 of 48755 relevant lines covered (20.32%)

105126.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.63
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
17,540✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

16
        # Process bt_raw so that it can be safely serialized
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
×
18
        CapturedException(ex, bt_lines)
×
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
×
22
end
23

24
function showerror(io::IO, ce::CapturedException)
×
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
×
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
35
"""
36
capture_exception(ex, bt) = CapturedException(ex, bt)
×
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
×
49
    CompositeException(exceptions) = new(exceptions)
×
50
end
51
length(c::CompositeException) = length(c.exceptions)
×
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
×
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
×
54
isempty(c::CompositeException) = isempty(c.exceptions)
×
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
×
56

57
function showerror(io::IO, ex::CompositeException)
×
58
    if !isempty(ex)
×
59
        showerror(io, ex.exceptions[1])
×
60
        remaining = length(ex) - 1
×
61
        if remaining > 0
×
62
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
×
63
        end
64
    else
65
        print(io, "CompositeException()\n")
×
66
    end
67
end
68

69
"""
70
    TaskFailedException
71

72
This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails.
73
`TaskFailedException` wraps the failed task `t`.
74
"""
75
struct TaskFailedException <: Exception
76
    task::Task
×
77
end
78

79
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
×
80
    print(io, "TaskFailedException")
×
81
    if bt !== nothing && backtrace
×
82
        show_backtrace(io, bt)
×
83
    end
84
    println(io)
×
85
    printstyled(io, "\n    nested task error: ", color=error_color())
×
86
    show_task_exception(io, ex.task)
×
87
end
88

89
function show_task_exception(io::IO, t::Task; indent = true)
×
90
    stack = current_exceptions(t)
×
91
    b = IOBuffer()
×
92
    if isempty(stack)
×
93
        # exception stack buffer not available; probably a serialized task
94
        showerror(IOContext(b, io), t.result)
×
95
    else
96
        show_exception_stack(IOContext(b, io), stack)
×
97
    end
98
    str = String(take!(b))
×
99
    if indent
×
100
        str = replace(str, "\n" => "\n    ")
×
101
    end
102
    print(io, str)
×
103
end
104

105
function show(io::IO, t::Task)
×
106
    state = t.state
×
107
    state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "")
×
108
    print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
109
end
110

111
"""
112
    @task
113

114
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
115
creates a task, and does not run it.
116

117
!!! warning
118
    By default tasks will have the sticky bit set to true `t.sticky`. This models the
119
    historic default for [`@async`](@ref). Sticky tasks can only be run on the worker thread
120
    they are first scheduled on, and when scheduled will make the task that they were scheduled
121
    from sticky. To obtain the behavior of [`Threads.@spawn`](@ref) set the sticky
122
    bit manually to `false`.
123

124
# Examples
125
```jldoctest
126
julia> a1() = sum(i for i in 1:1000);
127

128
julia> b = @task a1();
129

130
julia> istaskstarted(b)
131
false
132

133
julia> schedule(b);
134

135
julia> yield();
136

137
julia> istaskdone(b)
138
true
139
```
140
"""
141
macro task(ex)
142
    thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__)
×
143
    :(Task($thunk))
144
end
145

146
# task states
147

148
const task_state_runnable = UInt8(0)
149
const task_state_done     = UInt8(1)
150
const task_state_failed   = UInt8(2)
151

152
@inline function getproperty(t::Task, field::Symbol)
153
    if field === :state
7,881✔
154
        # TODO: this field name should be deprecated in 2.0
155
        st = @atomic :acquire t._state
×
156
        if st === task_state_runnable
×
157
            return :runnable
×
158
        elseif st === task_state_done
×
159
            return :done
×
160
        elseif st === task_state_failed
×
161
            return :failed
×
162
        else
163
            @assert false
×
164
        end
165
    elseif field === :backtrace
7,881✔
166
        # TODO: this field name should be deprecated in 2.0
167
        return current_exceptions(t)[end][2]
×
168
    elseif field === :exception
7,881✔
169
        # TODO: this field name should be deprecated in 2.0
170
        return t._isexception ? t.result : nothing
×
171
    elseif field === :scope
7,881✔
172
        error("""
×
173
            Querying a Task's `scope` field is disallowed.
174
            The private `Core.current_scope()` function is better, though still an implementation detail.""")
175
    else
176
        return getfield(t, field)
14,563✔
177
    end
178
end
179

180
@inline function setproperty!(t::Task, field::Symbol, @nospecialize(v))
181
    if field === :scope
7,872✔
182
        istaskstarted(t) && error("Setting scope on a started task directly is disallowed.")
×
183
    end
184
    return @invoke setproperty!(t::Any, field::Symbol, v::Any)
9,170✔
185
end
186

187
"""
188
    istaskdone(t::Task) -> Bool
189

190
Determine whether a task has exited.
191

192
# Examples
193
```jldoctest
194
julia> a2() = sum(i for i in 1:1000);
195

196
julia> b = Task(a2);
197

198
julia> istaskdone(b)
199
false
200

201
julia> schedule(b);
202

203
julia> yield();
204

205
julia> istaskdone(b)
206
true
207
```
208
"""
209
istaskdone(t::Task) = (@atomic :acquire t._state) !== task_state_runnable
2,706✔
210

211
"""
212
    istaskstarted(t::Task) -> Bool
213

214
Determine whether a task has started executing.
215

216
# Examples
217
```jldoctest
218
julia> a3() = sum(i for i in 1:1000);
219

220
julia> b = Task(a3);
221

222
julia> istaskstarted(b)
223
false
224
```
225
"""
226
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
×
227

228
"""
229
    istaskfailed(t::Task) -> Bool
230

231
Determine whether a task has exited because an exception was thrown.
232

233
# Examples
234
```jldoctest
235
julia> a4() = error("task failed");
236

237
julia> b = Task(a4);
238

239
julia> istaskfailed(b)
240
false
241

242
julia> schedule(b);
243

244
julia> yield();
245

246
julia> istaskfailed(b)
247
true
248
```
249

250
!!! compat "Julia 1.3"
251
    This function requires at least Julia 1.3.
252
"""
253
istaskfailed(t::Task) = ((@atomic :acquire t._state) === task_state_failed)
245✔
254

255
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
239✔
256
function Threads.threadpool(t::Task)
257
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
3✔
258
    return Threads._tpid_to_sym(tpid)
5✔
259
end
260

261
task_result(t::Task) = t.result
60✔
262

263
task_local_storage() = get_task_tls(current_task())
1,335✔
264
function get_task_tls(t::Task)
265
    if t.storage === nothing
1,335✔
266
        t.storage = IdDict()
1✔
267
    end
268
    return (t.storage)::IdDict{Any,Any}
1,335✔
269
end
270

271
"""
272
    task_local_storage(key)
273

274
Look up the value of a key in the current task's task-local storage.
275
"""
276
task_local_storage(key) = task_local_storage()[key]
×
277

278
"""
279
    task_local_storage(key, value)
280

281
Assign a value to a key in the current task's task-local storage.
282
"""
283
task_local_storage(key, val) = (task_local_storage()[key] = val)
×
284

285
"""
286
    task_local_storage(body, key, value)
287

288
Call the function `body` with a modified task-local storage, in which `value` is assigned to
289
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
290
for emulating dynamic scoping.
291
"""
292
function task_local_storage(body::Function, key, val)
×
293
    tls = task_local_storage()
×
294
    hadkey = haskey(tls, key)
×
295
    old = get(tls, key, nothing)
×
296
    tls[key] = val
×
297
    try
×
298
        return body()
×
299
    finally
300
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
301
    end
302
end
303

304
# just wait for a task to be done, no error propagation
305
function _wait(t::Task)
1✔
306
    t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
1✔
307
    if !istaskdone(t)
1✔
308
        donenotify = t.donenotify::ThreadSynchronizer
1✔
309
        lock(donenotify)
1✔
310
        try
1✔
311
            while !istaskdone(t)
2✔
312
                wait(donenotify)
1✔
313
            end
1✔
314
        finally
315
            unlock(donenotify)
1✔
316
        end
317
    end
318
    nothing
×
319
end
320

321
# have `waiter` wait for `t`
322
function _wait2(t::Task, waiter::Task)
1✔
323
    if !istaskdone(t)
1✔
324
        # since _wait2 is similar to schedule, we should observe the sticky
325
        # bit, even if we don't call `schedule` with early-return below
326
        if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
1✔
327
            # Issue #41324
328
            # t.sticky && tid == 0 is a task that needs to be co-scheduled with
329
            # the parent task. If the parent (current_task) is not sticky we must
330
            # set it to be sticky.
331
            # XXX: Ideally we would be able to unset this
332
            current_task().sticky = true
×
333
            tid = Threads.threadid()
×
334
            ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
×
335
        end
336
        donenotify = t.donenotify::ThreadSynchronizer
1✔
337
        lock(donenotify)
1✔
338
        if !istaskdone(t)
1✔
339
            push!(donenotify.waitq, waiter)
1✔
340
            unlock(donenotify)
1✔
341
            return nothing
1✔
342
        else
343
            unlock(donenotify)
×
344
        end
345
    end
346
    schedule(waiter)
×
347
    nothing
×
348
end
349

350
"""
351
    wait(t::Task; throw=true)
352

353
Wait for a `Task` to finish.
354

355
The keyword `throw` (defaults to `true`) controls whether a failed task results
356
in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed task.
357

358
Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks.
359
"""
360
function wait(t::Task; throw=true)
241✔
361
    _wait(t)
241✔
362
    if throw && istaskfailed(t)
241✔
363
        Core.throw(TaskFailedException(t))
×
364
    end
365
    nothing
×
366
end
367

368
# Wait multiple tasks
369

370
"""
371
    waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)
372

373
Wait until at least one of the given tasks have been completed.
374

375
If `throw` is `true`, throw `CompositeException` when one of the
376
completed tasks completes with an exception.
377

378
The return value consists of two task vectors. The first one consists of
379
completed tasks, and the other consists of uncompleted tasks.
380

381
!!! warning
382
    This may scale poorly compared to writing code that uses multiple individual tasks that
383
    each runs serially, since this needs to scan the list of `tasks` each time and
384
    synchronize with each one every time this is called. Or consider using
385
    [`waitall(tasks; failfast=true)`](@ref waitall) instead.
386
"""
387
waitany(tasks; throw=true) = _wait_multiple(tasks, throw)
×
388

389
"""
390
    waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)
391

392
Wait until all the given tasks have been completed.
393

394
If `failfast` is `true`, the function will return when at least one of the
395
given tasks is finished by exception. If `throw` is `true`, throw
396
`CompositeException` when one of the completed tasks has failed.
397

398
`failfast` and `throw` keyword arguments work independently; when only
399
`throw=true` is specified, this function waits for all the tasks to complete.
400

401
The return value consists of two task vectors. The first one consists of
402
completed tasks, and the other consists of uncompleted tasks.
403
"""
404
waitall(tasks; failfast=true, throw=true) = _wait_multiple(tasks, throw, true, failfast)
×
405

406
function _wait_multiple(waiting_tasks, throwexc=false, all=false, failfast=false)
×
407
    tasks = Task[]
×
408

409
    for t in waiting_tasks
×
410
        t isa Task || error("Expected an iterator of `Task` object")
×
411
        push!(tasks, t)
×
412
    end
×
413

414
    if (all && !failfast) || length(tasks) <= 1
×
415
        exception = false
×
416
        # Force everything to finish synchronously for the case of waitall
417
        # with failfast=false
418
        for t in tasks
×
419
            _wait(t)
×
420
            exception |= istaskfailed(t)
×
421
        end
×
422
        if exception && throwexc
×
423
            exceptions = [TaskFailedException(t) for t in tasks if istaskfailed(t)]
×
424
            throw(CompositeException(exceptions))
×
425
        else
426
            return tasks, Task[]
×
427
        end
428
    end
429

430
    exception = false
×
431
    nremaining::Int = length(tasks)
×
432
    done_mask = falses(nremaining)
×
433
    for (i, t) in enumerate(tasks)
×
434
        if istaskdone(t)
×
435
            done_mask[i] = true
×
436
            exception |= istaskfailed(t)
×
437
            nremaining -= 1
×
438
        else
439
            done_mask[i] = false
×
440
        end
441
    end
×
442

443
    if nremaining == 0
×
444
        return tasks, Task[]
×
445
    elseif any(done_mask) && (!all || (failfast && exception))
×
446
        if throwexc && (!all || failfast) && exception
×
447
            exceptions = [TaskFailedException(t) for t in tasks[done_mask] if istaskfailed(t)]
×
448
            throw(CompositeException(exceptions))
×
449
        else
450
            return tasks[done_mask], tasks[.~done_mask]
×
451
        end
452
    end
453

454
    chan = Channel{Int}(Inf)
×
455
    sentinel = current_task()
×
456
    waiter_tasks = fill(sentinel, length(tasks))
×
457

458
    for (i, done) in enumerate(done_mask)
×
459
        done && continue
×
460
        t = tasks[i]
×
461
        if istaskdone(t)
×
462
            done_mask[i] = true
×
463
            exception |= istaskfailed(t)
×
464
            nremaining -= 1
×
465
            exception && failfast && break
×
466
        else
467
            waiter = @task put!(chan, i)
×
468
            waiter.sticky = false
×
469
            _wait2(t, waiter)
×
470
            waiter_tasks[i] = waiter
×
471
        end
472
    end
×
473

474
    while nremaining > 0
×
475
        i = take!(chan)
×
476
        t = tasks[i]
×
477
        waiter_tasks[i] = sentinel
×
478
        done_mask[i] = true
×
479
        exception |= istaskfailed(t)
×
480
        nremaining -= 1
×
481

482
        # stop early if requested, unless there is something immediately
483
        # ready to consume from the channel (using a race-y check)
484
        if (!all || (failfast && exception)) && !isready(chan)
×
485
            break
×
486
        end
487
    end
×
488

489
    close(chan)
×
490

491
    if nremaining == 0
×
492
        return tasks, Task[]
×
493
    else
494
        remaining_mask = .~done_mask
×
495
        for i in findall(remaining_mask)
×
496
            waiter = waiter_tasks[i]
×
497
            donenotify = tasks[i].donenotify::ThreadSynchronizer
×
498
            @lock donenotify Base.list_deletefirst!(donenotify.waitq, waiter)
×
499
        end
×
500
        done_tasks = tasks[done_mask]
×
501
        if throwexc && exception
×
502
            exceptions = [TaskFailedException(t) for t in done_tasks if istaskfailed(t)]
×
503
            throw(CompositeException(exceptions))
×
504
        else
505
            return done_tasks, tasks[remaining_mask]
×
506
        end
507
    end
508
end
509

510
"""
511
    fetch(x::Any)
512

513
Return `x`.
514
"""
515
fetch(@nospecialize x) = x
×
516

517
"""
518
    fetch(t::Task)
519

520
Wait for a [`Task`](@ref) to finish, then return its result value.
521
If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task)
522
is thrown.
523
"""
524
function fetch(t::Task)
525
    wait(t)
60✔
526
    return task_result(t)
60✔
527
end
528

529

530
## lexically-scoped waiting for multiple items
531

532
struct ScheduledAfterSyncException <: Exception
533
    values::Vector{Any}
×
534
end
535

536
function showerror(io::IO, ex::ScheduledAfterSyncException)
×
537
    print(io, "ScheduledAfterSyncException: ")
×
538
    if isempty(ex.values)
×
539
        print(io, "(no values)")
×
540
        return
×
541
    end
542
    show(io, ex.values[1])
×
543
    if length(ex.values) == 1
×
544
        print(io, " is")
×
545
    elseif length(ex.values) == 2
×
546
        print(io, " and one more ")
×
547
        print(io, nameof(typeof(ex.values[2])))
×
548
        print(io, " are")
×
549
    else
550
        print(io, " and ", length(ex.values) - 1, " more objects are")
×
551
    end
552
    print(io, " registered after the end of a `@sync` block")
×
553
end
554

555
function sync_end(c::Channel{Any})
×
556
    local c_ex
×
557
    while isready(c)
×
558
        r = take!(c)
×
559
        if isa(r, Task)
×
560
            _wait(r)
×
561
            if istaskfailed(r)
×
562
                if !@isdefined(c_ex)
×
563
                    c_ex = CompositeException()
×
564
                end
565
                push!(c_ex, TaskFailedException(r))
×
566
            end
567
        else
568
            try
×
569
                wait(r)
×
570
            catch e
571
                if !@isdefined(c_ex)
×
572
                    c_ex = CompositeException()
×
573
                end
574
                push!(c_ex, e)
×
575
            end
576
        end
577
    end
×
578
    close(c)
×
579

580
    # Capture all waitable objects scheduled after the end of `@sync` and
581
    # include them in the exception. This way, the user can check what was
582
    # scheduled by examining at the exception object.
583
    if isready(c)
×
584
        local racy
585
        for r in c
×
586
            if !@isdefined(racy)
×
587
                racy = []
×
588
            end
589
            push!(racy, r)
×
590
        end
×
591
        if @isdefined(racy)
×
592
            if !@isdefined(c_ex)
×
593
                c_ex = CompositeException()
×
594
            end
595
            # Since this is a clear programming error, show this exception first:
596
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
×
597
        end
598
    end
599

600
    if @isdefined(c_ex)
×
601
        throw(c_ex)
×
602
    end
603
    nothing
×
604
end
605

606
const sync_varname = gensym(:sync)
607

608
"""
609
    @sync
610

611
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn),
612
`Distributed.@spawnat` and `Distributed.@distributed`
613
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
614
a [`CompositeException`](@ref).
615

616
# Examples
617
```julia-repl
618
julia> Threads.nthreads()
619
4
620

621
julia> @sync begin
622
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
623
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
624
       end;
625
Thread-id 3, task 1
626
Thread-id 1, task 2
627
```
628
"""
629
macro sync(block)
630
    var = esc(sync_varname)
631
    quote
632
        let $var = Channel(Inf)
12✔
633
            v = $(esc(block))
75✔
634
            sync_end($var)
12✔
635
            v
12✔
636
        end
637
    end
638
end
639

640
# schedule an expression to run asynchronously
641

642
"""
643
    @async
644

645
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
646

647
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
648
constructed underlying closure. This allows you to insert the _value_ of a variable,
649
isolating the asynchronous code from changes to the variable's value in the current task.
650

651
!!! warning
652
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
653
    parallelism is required** especially in publicly distributed libraries.  This is
654
    because a use of `@async` disables the migration of the *parent* task across worker
655
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
656
    `@async` in a library function can have a large impact on the performance of very
657
    different parts of user applications.
658

659
!!! compat "Julia 1.4"
660
    Interpolating values via `\$` is available as of Julia 1.4.
661
"""
662
macro async(expr)
663
    do_async_macro(expr, __source__)
664
end
665

666
# generate the code for @async, possibly wrapping the task in something before
667
# pushing it to the wait queue.
668
function do_async_macro(expr, linenums; wrap=identity)
669
    letargs = Base._lift_one_interp!(expr)
670

671
    thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums)
672
    var = esc(sync_varname)
673
    quote
674
        let $(letargs...)
×
675
            local task = Task($thunk)
599✔
676
            if $(Expr(:islocal, var))
586✔
677
                put!($var, $(wrap(:task)))
98✔
678
            end
679
            schedule(task)
586✔
680
            task
586✔
681
        end
682
    end
683
end
684

685
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
686
struct UnwrapTaskFailedException <: Exception
687
    task::Task
×
688
end
689

690
# common code for wait&fetch for UnwrapTaskFailedException
691
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
×
692
    try
×
693
        f(t.task)
×
694
    catch ex
695
        if ex isa TaskFailedException
×
696
            throw(ex.task.exception)
×
697
        else
698
            rethrow()
×
699
        end
700
    end
701
end
702

703
# the unwrapping for above task wrapper (gets triggered in sync_end())
704
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
×
705

706
# same for fetching the tasks, for convenience
707
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
708

709
# macro for running async code that doesn't throw wrapped exceptions
710
macro async_unwrap(expr)
711
    do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task)))
×
712
end
713

714
"""
715
    errormonitor(t::Task)
716

717
Print an error log to `stderr` if task `t` fails.
718

719
# Examples
720
```julia-repl
721
julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
722
Unhandled Task ERROR: task failed
723
Stacktrace:
724
[...]
725
```
726
"""
727
function errormonitor(t::Task)
728
    t2 = Task() do
307✔
729
        if istaskfailed(t)
1✔
730
            local errs = stderr
×
731
            try # try to display the failure atomically
×
732
                errio = IOContext(PipeBuffer(), errs::IO)
×
733
                emphasize(errio, "Unhandled Task ")
×
734
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
735
                write(errs, errio)
×
736
            catch
737
                try # try to display the secondary error atomically
×
738
                    errio = IOContext(PipeBuffer(), errs::IO)
×
739
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
740
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
741
                    write(errs, errio)
×
742
                    flush(errs)
×
743
                    # and then the actual error, as best we can
744
                    Core.print(Core.stderr, "while handling: ")
×
745
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
746
                catch e
747
                    # give up
748
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
749
                            " while trying to print a failed Task notice; giving up\n")
750
                end
751
            end
752
        end
753
        nothing
1✔
754
    end
755
    t2.sticky = false
307✔
756
    _wait2(t, t2)
307✔
757
    return t
×
758
end
759

760
# Capture interpolated variables in $() and move them to let-block
761
function _lift_one_interp!(e)
×
762
    letargs = Any[]  # store the new gensymed arguments
×
763
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
×
764
    letargs
×
765
end
766
_lift_one_interp_helper(v, _, _) = v
×
767
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
×
768
    if expr.head === :$
×
769
        if in_quote_context  # This $ is simply interpolating out of the quote
×
770
            # Now, we're out of the quote, so any _further_ $ is ours.
771
            in_quote_context = false
×
772
        else
773
            newarg = gensym()
×
774
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
×
775
            return newarg  # Don't recurse into the lifted $() exprs
×
776
        end
777
    elseif expr.head === :quote
×
778
        in_quote_context = true   # Don't try to lift $ directly out of quotes
×
779
    elseif expr.head === :macrocall
×
780
        return expr  # Don't recur into macro calls, since some other macros use $
×
781
    end
782
    for (i,e) in enumerate(expr.args)
×
783
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
×
784
    end
×
785
    expr
×
786
end
787

788

789
# add a wait-able object to the sync pool
790
macro sync_add(expr)
791
    var = esc(sync_varname)
792
    quote
793
        local ref = $(esc(expr))
794
        put!($var, ref)
795
        ref
796
    end
797
end
798

799
function repl_backend_task()
800
    @isdefined(active_repl_backend) || return
×
801
    backend = active_repl_backend
×
802
    isdefined(backend, :backend_task) || return
×
803
    backend_task = getfield(active_repl_backend, :backend_task)::Task
×
804
    if backend_task._state === task_state_runnable && getfield(backend, :in_eval)
×
805
        return backend_task
×
806
    end
807
    return
×
808
end
809

810
# runtime system hook called when a task finishes
811
function task_done_hook(t::Task)
3✔
812
    # `finish_task` sets `sigatomic` before entering this function
813
    err = istaskfailed(t)
3✔
814
    result = task_result(t)
3✔
815
    handled = false
×
816

817
    donenotify = t.donenotify
3✔
818
    if isa(donenotify, ThreadSynchronizer)
3✔
819
        lock(donenotify)
2✔
820
        try
2✔
821
            if !isempty(donenotify.waitq)
2✔
822
                handled = true
1✔
823
                notify(donenotify)
1✔
824
            end
825
        finally
826
            unlock(donenotify)
2✔
827
        end
828
    end
829

830
    if err && !handled && Threads.threadid() == 1
3✔
831
        if isa(result, InterruptException) && isempty(Workqueue)
×
832
            backend = repl_backend_task()
×
833
            backend isa Task && throwto(backend, result)
×
834
        end
835
    end
836
    # Clear sigatomic before waiting
837
    sigatomic_end()
3✔
838
    try
3✔
839
        wait() # this will not return
3✔
840
    catch e
841
        # If an InterruptException happens while blocked in the event loop, try handing
842
        # the exception to the REPL task since the current task is done.
843
        # issue #19467
844
        if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue)
×
845
            backend = repl_backend_task()
×
846
            backend isa Task && throwto(backend, e)
×
847
        end
848
        rethrow() # this will terminate the program
×
849
    end
850
end
851

852
function init_task_lock(t::Task) # Function only called from jl_adopt_thread so foreign tasks have a lock.
×
853
    if t.donenotify === nothing
×
854
        t.donenotify = ThreadSynchronizer()
×
855
    end
856
end
857

858
## scheduler and work queue
859

860
mutable struct IntrusiveLinkedListSynchronized{T}
861
    queue::IntrusiveLinkedList{T}
862
    lock::Threads.SpinLock
863
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
1✔
864
end
865
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
3,388✔
866
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
×
867
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
237✔
868
    lock(W.lock)
237✔
869
    try
237✔
870
        push!(W.queue, t)
238✔
871
    finally
872
        unlock(W.lock)
237✔
873
    end
874
    return W
237✔
875
end
876
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
×
877
    lock(W.lock)
×
878
    try
×
879
        pushfirst!(W.queue, t)
×
880
    finally
881
        unlock(W.lock)
×
882
    end
883
    return W
×
884
end
885
function pop!(W::IntrusiveLinkedListSynchronized)
×
886
    lock(W.lock)
×
887
    try
×
888
        return pop!(W.queue)
×
889
    finally
890
        unlock(W.lock)
×
891
    end
892
end
893
function popfirst!(W::IntrusiveLinkedListSynchronized)
237✔
894
    lock(W.lock)
237✔
895
    try
237✔
896
        return popfirst!(W.queue)
238✔
897
    finally
898
        unlock(W.lock)
237✔
899
    end
900
end
901
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
×
902
    lock(W.lock)
×
903
    try
×
904
        list_deletefirst!(W.queue, t)
×
905
    finally
906
        unlock(W.lock)
×
907
    end
908
    return W
×
909
end
910

911
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
912
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
913
const Workqueues_lock = Threads.SpinLock()
914
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
915

916
function workqueue_for(tid::Int)
475✔
917
    qs = Workqueues
475✔
918
    if length(qs) >= tid && isassigned(qs, tid)
475✔
919
        return @inbounds qs[tid]
474✔
920
    end
921
    # slow path to allocate it
922
    @assert tid > 0
1✔
923
    l = Workqueues_lock
×
924
    @lock l begin
1✔
925
        qs = Workqueues
1✔
926
        if length(qs) < tid
1✔
927
            nt = Threads.maxthreadid()
1✔
928
            @assert tid <= nt
1✔
929
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
2✔
930
        end
931
        if !isassigned(qs, tid)
1✔
932
            @inbounds qs[tid] = StickyWorkqueue()
1✔
933
        end
934
        return @inbounds qs[tid]
1✔
935
    end
936
end
937

938
function enq_work(t::Task)
237✔
939
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
237✔
940

941
    # Sticky tasks go into their thread's work queue.
942
    if t.sticky
237✔
943
        tid = Threads.threadid(t)
234✔
944
        if tid == 0
234✔
945
            # The task is not yet stuck to a thread. Stick it to the current
946
            # thread and do the same to the parent task (the current task) so
947
            # that the tasks are correctly co-scheduled (issue #41324).
948
            # XXX: Ideally we would be able to unset this.
949
            if GC.in_finalizer()
×
950
                # The task was launched in a finalizer. There is no thread to sticky it
951
                # to, so just allow it to run anywhere as if it had been non-sticky.
952
                t.sticky = false
×
953
                @goto not_sticky
×
954
            else
955
                tid = Threads.threadid()
×
956
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
×
957
                current_task().sticky = true
×
958
            end
959
        end
960
        push!(workqueue_for(tid), t)
234✔
961
    else
962
        @label not_sticky
963
        tp = Threads.threadpool(t)
5✔
964
        if tp === :foreign || Threads.threadpoolsize(tp) == 1
8✔
965
            # There's only one thread in the task's assigned thread pool;
966
            # use its work queue.
967
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
5✔
968
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
3✔
969
            push!(workqueue_for(tid), t)
3✔
970
        else
971
            # Otherwise, put the task in the multiqueue.
972
            Partr.multiq_insert(t, t.priority)
×
973
            tid = 0
×
974
        end
975
    end
976
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
237✔
977
    return t
237✔
978
end
979

980
function schedule(t::Task)
981
    # [task] created -scheduled-> wait_time
982
    maybe_record_enqueued!(t)
612✔
983
    enq_work(t)
612✔
984
end
985

986
"""
987
    schedule(t::Task, [val]; error=false)
988

989
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
990
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
991

992
If a second argument `val` is provided, it will be passed to the task (via the return value of
993
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
994
the woken task.
995

996
!!! warning
997
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
998
    See [the API reference](@ref low-level-schedule-wait) for more information.
999

1000
!!! warning
1001
    By default tasks will have the sticky bit set to true `t.sticky`. This models the
1002
    historic default for [`@async`](@ref). Sticky tasks can only be run on the worker thread
1003
    they are first scheduled on, and when scheduled will make the task that they were scheduled
1004
    from sticky. To obtain the behavior of [`Threads.@spawn`](@ref) set the sticky
1005
    bit manually to `false`.
1006

1007
# Examples
1008
```jldoctest
1009
julia> a5() = sum(i for i in 1:1000);
1010

1011
julia> b = Task(a5);
1012

1013
julia> istaskstarted(b)
1014
false
1015

1016
julia> schedule(b);
1017

1018
julia> yield();
1019

1020
julia> istaskstarted(b)
1021
true
1022

1023
julia> istaskdone(b)
1024
true
1025
```
1026
"""
1027
function schedule(t::Task, @nospecialize(arg); error=false)
476✔
1028
    # schedule a task to be (re)started with the given value or exception
1029
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
236✔
1030
    if error
236✔
UNCOV
1031
        q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t)
×
UNCOV
1032
        setfield!(t, :result, arg)
×
UNCOV
1033
        setfield!(t, :_isexception, true)
×
1034
    else
1035
        t.queue === nothing || Base.error("schedule: Task not runnable")
236✔
1036
        setfield!(t, :result, arg)
236✔
1037
    end
1038
    # [task] created -scheduled-> wait_time
1039
    maybe_record_enqueued!(t)
236✔
1040
    enq_work(t)
236✔
1041
    return t
236✔
1042
end
1043

1044
"""
1045
    yield()
1046

1047
Switch to the scheduler to allow another scheduled task to run. A task that calls this
1048
function is still runnable, and will be restarted immediately if there are no other runnable
1049
tasks.
1050
"""
UNCOV
1051
function yield()
×
UNCOV
1052
    ct = current_task()
×
UNCOV
1053
    enq_work(ct)
×
UNCOV
1054
    try
×
UNCOV
1055
        wait()
×
1056
    catch
1057
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
×
1058
        rethrow()
×
1059
    end
1060
end
1061

1062
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
237✔
1063

1064
"""
1065
    yield(t::Task, arg = nothing)
1066

1067
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
1068
immediately yields to `t` before calling the scheduler.
1069

1070
Throws a `ConcurrencyViolationError` if `t` is the currently running task.
1071
"""
UNCOV
1072
function yield(t::Task, @nospecialize(x=nothing))
×
UNCOV
1073
    ct = current_task()
×
UNCOV
1074
    t === ct && throw(ConcurrencyViolationError("Cannot yield to currently running task!"))
×
UNCOV
1075
    (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable"))
×
1076
    # [task] user_time -yield-> wait_time
UNCOV
1077
    record_running_time!(ct)
×
1078
    # [task] created -scheduled-> wait_time
UNCOV
1079
    maybe_record_enqueued!(t)
×
UNCOV
1080
    t.result = x
×
UNCOV
1081
    enq_work(ct)
×
UNCOV
1082
    set_next_task(t)
×
UNCOV
1083
    return try_yieldto(ensure_rescheduled)
×
1084
end
1085

1086
"""
1087
    yieldto(t::Task, arg = nothing)
1088

1089
Switch to the given task. The first time a task is switched to, the task's function is
1090
called with no arguments. On subsequent switches, `arg` is returned from the task's last
1091
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
1092
or scheduling in any way. Its use is discouraged.
1093
"""
1094
function yieldto(t::Task, @nospecialize(x=nothing))
×
1095
    ct = current_task()
×
1096
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
1097
    # state error instead.
1098
    if t._state === task_state_done
×
1099
        return x
×
1100
    elseif t._state === task_state_failed
×
1101
        throw(t.result)
×
1102
    end
1103
    # [task] user_time -yield-> wait_time
1104
    record_running_time!(ct)
×
1105
    # [task] created -scheduled-unfairly-> wait_time
1106
    maybe_record_enqueued!(t)
×
1107
    t.result = x
×
1108
    set_next_task(t)
×
1109
    return try_yieldto(identity)
×
1110
end
1111

1112
function try_yieldto(undo)
237✔
1113
    try
237✔
1114
        ccall(:jl_switch, Cvoid, ())
237✔
1115
    catch
1116
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
×
1117
        rethrow()
×
1118
    end
1119
    ct = current_task()
235✔
1120
    # [task] wait_time -(re)started-> user_time
1121
    if ct.metrics_enabled
235✔
1122
        @atomic :monotonic ct.last_started_running_at = time_ns()
×
1123
    end
1124
    if ct._isexception
235✔
UNCOV
1125
        exc = ct.result
×
UNCOV
1126
        ct.result = nothing
×
UNCOV
1127
        ct._isexception = false
×
UNCOV
1128
        throw(exc)
×
1129
    end
1130
    result = ct.result
235✔
1131
    ct.result = nothing
235✔
1132
    return result
235✔
1133
end
1134

1135
# yield to a task, throwing an exception in it
1136
function throwto(t::Task, @nospecialize exc)
×
1137
    ct = current_task()
×
1138
    # [task] user_time -yield-> wait_time
1139
    record_running_time!(ct)
×
1140
    # [task] created -scheduled-unfairly-> wait_time
1141
    maybe_record_enqueued!(t)
×
1142
    t.result = exc
×
1143
    t._isexception = true
×
1144
    set_next_task(t)
×
1145
    return try_yieldto(identity)
×
1146
end
1147

1148
function ensure_rescheduled(othertask::Task)
×
1149
    ct = current_task()
×
1150
    W = workqueue_for(Threads.threadid())
×
1151
    if ct !== othertask && othertask._state === task_state_runnable
×
1152
        # we failed to yield to othertask
1153
        # return it to the head of a queue to be retried later
1154
        tid = Threads.threadid(othertask)
×
1155
        Wother = tid == 0 ? W : workqueue_for(tid)
×
1156
        pushfirst!(Wother, othertask)
×
1157
    end
1158
    # if the current task was queued,
1159
    # also need to return it to the runnable state
1160
    # before throwing an error
1161
    list_deletefirst!(W, ct)
×
1162
    nothing
×
1163
end
1164

1165
function trypoptask(W::StickyWorkqueue)
3,388✔
1166
    while !isempty(W)
3,388✔
1167
        t = popfirst!(W)
237✔
1168
        if t._state !== task_state_runnable
237✔
1169
            # assume this somehow got queued twice,
1170
            # probably broken now, but try discarding this switch and keep going
1171
            # can't throw here, because it's probably not the fault of the caller to wait
1172
            # and don't want to use print() here, because that may try to incur a task switch
1173
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
×
1174
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
1175
            continue
×
1176
        end
1177
        return t
237✔
1178
    end
×
1179
    return Partr.multiq_deletemin()
3,152✔
1180
end
1181

1182
checktaskempty = Partr.multiq_check_empty
1183

1184
@noinline function poptask(W::StickyWorkqueue)
238✔
1185
    task = trypoptask(W)
238✔
1186
    if !(task isa Task)
238✔
1187
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
237✔
1188
    end
1189
    set_next_task(task)
237✔
1190
    nothing
×
1191
end
1192

1193
function wait()
238✔
1194
    ct = current_task()
238✔
1195
    # [task] user_time -yield-or-done-> wait_time
1196
    record_running_time!(ct)
238✔
1197
    GC.safepoint()
238✔
1198
    W = workqueue_for(Threads.threadid())
238✔
1199
    poptask(W)
238✔
1200
    result = try_yieldto(ensure_rescheduled)
237✔
1201
    process_events()
235✔
1202
    # return when we come out of the queue
1203
    return result
235✔
1204
end
1205

1206
if Sys.iswindows()
1207
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
1208
else
1209
    pause() = ccall(:pause, Cvoid, ())
×
1210
end
1211

1212
# update the `running_time_ns` field of `t` to include the time since it last started running.
1213
function record_running_time!(t::Task)
1214
    if t.metrics_enabled && !istaskdone(t)
238✔
1215
        @atomic :monotonic t.running_time_ns += time_ns() - t.last_started_running_at
×
1216
    end
1217
    return t
238✔
1218
end
1219

1220
# if this is the first time `t` has been added to the run queue
1221
# (or the first time it has been unfairly yielded to without being added to the run queue)
1222
# then set the `first_enqueued_at` field to the current time.
1223
function maybe_record_enqueued!(t::Task)
1224
    if t.metrics_enabled && t.first_enqueued_at == 0
848✔
1225
        @atomic :monotonic t.first_enqueued_at = time_ns()
×
1226
    end
1227
    return t
848✔
1228
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc