• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37919

29 Sep 2024 09:41AM UTC coverage: 86.232% (-0.3%) from 86.484%
#37919

push

local

web-flow
fix rawbigints OOB issues (#55917)

Fixes issues introduced in #50691 and found in #55906:
* use `@inbounds` and `@boundscheck` macros in rawbigints, for catching
OOB with `--check-bounds=yes`
* fix OOB in `truncate`

12 of 13 new or added lines in 1 file covered. (92.31%)

1287 existing lines in 41 files now uncovered.

77245 of 89578 relevant lines covered (86.23%)

15686161.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.72
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
326,124✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
1✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

16
        # Process bt_raw so that it can be safely serialized
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
2✔
18
        CapturedException(ex, bt_lines)
2✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
2✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
×
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
×
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
35
"""
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
1✔
49
    CompositeException(exceptions) = new(exceptions)
×
50
end
51
length(c::CompositeException) = length(c.exceptions)
4✔
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
2✔
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
2✔
54
isempty(c::CompositeException) = isempty(c.exceptions)
4✔
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
3✔
56

57
function showerror(io::IO, ex::CompositeException)
2✔
58
    if !isempty(ex)
2✔
59
        showerror(io, ex.exceptions[1])
1✔
60
        remaining = length(ex) - 1
1✔
61
        if remaining > 0
1✔
62
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
1✔
63
        end
64
    else
65
        print(io, "CompositeException()\n")
1✔
66
    end
67
end
68

69
"""
70
    TaskFailedException
71

72
This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails.
73
`TaskFailedException` wraps the failed task `t`.
74
"""
75
struct TaskFailedException <: Exception
76
    task::Task
123✔
77
end
78

UNCOV
79
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
×
UNCOV
80
    print(io, "TaskFailedException")
×
UNCOV
81
    if bt !== nothing && backtrace
×
UNCOV
82
        show_backtrace(io, bt)
×
83
    end
UNCOV
84
    println(io)
×
UNCOV
85
    printstyled(io, "\n    nested task error: ", color=error_color())
×
UNCOV
86
    show_task_exception(io, ex.task)
×
87
end
88

UNCOV
89
function show_task_exception(io::IO, t::Task; indent = true)
×
UNCOV
90
    stack = current_exceptions(t)
×
UNCOV
91
    b = IOBuffer()
×
UNCOV
92
    if isempty(stack)
×
93
        # exception stack buffer not available; probably a serialized task
94
        showerror(IOContext(b, io), t.result)
×
95
    else
UNCOV
96
        show_exception_stack(IOContext(b, io), stack)
×
97
    end
UNCOV
98
    str = String(take!(b))
×
UNCOV
99
    if indent
×
UNCOV
100
        str = replace(str, "\n" => "\n    ")
×
101
    end
UNCOV
102
    print(io, str)
×
103
end
104

105
function show(io::IO, t::Task)
×
106
    state = t.state
×
107
    state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "")
×
108
    print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
109
end
110

111
"""
112
    @task
113

114
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
115
creates a task, and does not run it.
116

117
!!! warning
118
    By default tasks will have the sticky bit set to true `t.sticky`. This models the
119
    historic default for [`@async`](@ref). Sticky tasks can only be run on the worker thread
120
    they are first scheduled on, and when scheduled will make the task that they were scheduled
121
    from sticky. To obtain the behavior of [`Threads.@spawn`](@ref) set the sticky
122
    bit manually to `false`.
123

124
# Examples
125
```jldoctest
126
julia> a1() = sum(i for i in 1:1000);
127

128
julia> b = @task a1();
129

130
julia> istaskstarted(b)
131
false
132

133
julia> schedule(b);
134

135
julia> yield();
136

137
julia> istaskdone(b)
138
true
139
```
140
"""
141
macro task(ex)
19✔
142
    thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__)
19✔
143
    :(Task($thunk))
19✔
144
end
145

146
"""
147
    current_task()
148

149
Get the currently running [`Task`](@ref).
150
"""
151
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
666,392,037✔
152

153
# task states
154

155
const task_state_runnable = UInt8(0)
156
const task_state_done     = UInt8(1)
157
const task_state_failed   = UInt8(2)
158

159
@inline function getproperty(t::Task, field::Symbol)
21✔
160
    if field === :state
3,550✔
161
        # TODO: this field name should be deprecated in 2.0
162
        st = @atomic :acquire t._state
9✔
163
        if st === task_state_runnable
9✔
164
            return :runnable
1✔
165
        elseif st === task_state_done
8✔
166
            return :done
5✔
167
        elseif st === task_state_failed
3✔
168
            return :failed
3✔
169
        else
170
            @assert false
×
171
        end
172
    elseif field === :backtrace
3,541✔
173
        # TODO: this field name should be deprecated in 2.0
174
        return current_exceptions(t)[end][2]
×
175
    elseif field === :exception
3,541✔
176
        # TODO: this field name should be deprecated in 2.0
177
        return t._isexception ? t.result : nothing
5✔
178
    elseif field === :scope
3,536✔
179
        error("""
1✔
180
            Querying a Task's `scope` field is disallowed.
181
            The private `Core.current_scope()` function is better, though still an implementation detail.""")
182
    else
183
        return getfield(t, field)
221,354,297✔
184
    end
185
end
186

187
@inline function setproperty!(t::Task, field::Symbol, @nospecialize(v))
2✔
188
    if field === :scope
277,177,189✔
189
        istaskstarted(t) && error("Setting scope on a started task directly is disallowed.")
×
190
    end
191
    return @invoke setproperty!(t::Any, field::Symbol, v::Any)
285,313,917✔
192
end
193

194
"""
195
    istaskdone(t::Task) -> Bool
196

197
Determine whether a task has exited.
198

199
# Examples
200
```jldoctest
201
julia> a2() = sum(i for i in 1:1000);
202

203
julia> b = Task(a2);
204

205
julia> istaskdone(b)
206
false
207

208
julia> schedule(b);
209

210
julia> yield();
211

212
julia> istaskdone(b)
213
true
214
```
215
"""
216
istaskdone(t::Task) = (@atomic :acquire t._state) !== task_state_runnable
166,553✔
217

218
"""
219
    istaskstarted(t::Task) -> Bool
220

221
Determine whether a task has started executing.
222

223
# Examples
224
```jldoctest
225
julia> a3() = sum(i for i in 1:1000);
226

227
julia> b = Task(a3);
228

229
julia> istaskstarted(b)
230
false
231
```
232
"""
233
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
3✔
234

235
"""
236
    istaskfailed(t::Task) -> Bool
237

238
Determine whether a task has exited because an exception was thrown.
239

240
# Examples
241
```jldoctest
242
julia> a4() = error("task failed");
243

244
julia> b = Task(a4);
245

246
julia> istaskfailed(b)
247
false
248

249
julia> schedule(b);
250

251
julia> yield();
252

253
julia> istaskfailed(b)
254
true
255
```
256

257
!!! compat "Julia 1.3"
258
    This function requires at least Julia 1.3.
259
"""
260
istaskfailed(t::Task) = ((@atomic :acquire t._state) === task_state_failed)
316,230✔
261

262
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
1,899,836✔
263
function Threads.threadpool(t::Task)
1✔
264
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
12,187✔
265
    return Threads._tpid_to_sym(tpid)
24,365✔
266
end
267

268
task_result(t::Task) = t.result
162,280✔
269

270
task_local_storage() = get_task_tls(current_task())
68,042,894✔
271
function get_task_tls(t::Task)
272
    if t.storage === nothing
68,042,894✔
273
        t.storage = IdDict()
676✔
274
    end
275
    return (t.storage)::IdDict{Any,Any}
68,042,894✔
276
end
277

278
"""
279
    task_local_storage(key)
280

281
Look up the value of a key in the current task's task-local storage.
282
"""
283
task_local_storage(key) = task_local_storage()[key]
×
284

285
"""
286
    task_local_storage(key, value)
287

288
Assign a value to a key in the current task's task-local storage.
289
"""
290
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
291

292
"""
293
    task_local_storage(body, key, value)
294

295
Call the function `body` with a modified task-local storage, in which `value` is assigned to
296
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
297
for emulating dynamic scoping.
298
"""
299
function task_local_storage(body::Function, key, val)
×
300
    tls = task_local_storage()
×
301
    hadkey = haskey(tls, key)
×
302
    old = get(tls, key, nothing)
×
303
    tls[key] = val
×
304
    try
×
305
        return body()
×
306
    finally
307
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
308
    end
309
end
310

311
# just wait for a task to be done, no error propagation
312
function _wait(t::Task)
152,589✔
313
    t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
152,589✔
314
    if !istaskdone(t)
152,589✔
315
        donenotify = t.donenotify::ThreadSynchronizer
2,632✔
316
        lock(donenotify)
2,632✔
317
        try
2,632✔
318
            while !istaskdone(t)
5,264✔
319
                wait(donenotify)
2,632✔
320
            end
2,632✔
321
        finally
322
            unlock(donenotify)
2,632✔
323
        end
324
    end
325
    nothing
326
end
327

328
# have `waiter` wait for `t`
329
function _wait2(t::Task, waiter::Task)
3,105✔
330
    if !istaskdone(t)
3,105✔
331
        # since _wait2 is similar to schedule, we should observe the sticky
332
        # bit, even if we don't call `schedule` with early-return below
333
        if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
3,105✔
334
            # Issue #41324
335
            # t.sticky && tid == 0 is a task that needs to be co-scheduled with
336
            # the parent task. If the parent (current_task) is not sticky we must
337
            # set it to be sticky.
338
            # XXX: Ideally we would be able to unset this
339
            current_task().sticky = true
24✔
340
            tid = Threads.threadid()
24✔
341
            ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
24✔
342
        end
343
        donenotify = t.donenotify::ThreadSynchronizer
3,105✔
344
        lock(donenotify)
3,105✔
345
        if !istaskdone(t)
3,105✔
346
            push!(donenotify.waitq, waiter)
3,150✔
347
            unlock(donenotify)
3,105✔
348
            return nothing
3,105✔
349
        else
350
            unlock(donenotify)
×
351
        end
352
    end
353
    schedule(waiter)
×
354
    nothing
355
end
356

357
"""
358
    wait(t::Task; throw=true)
359

360
Wait for a `Task` to finish.
361

362
The keyword `throw` (defaults to `true`) controls whether a failed task results
363
in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed task.
364

365
Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks.
366
"""
367
function wait(t::Task; throw=true)
4,920✔
368
    _wait(t)
4,920✔
369
    if throw && istaskfailed(t)
4,920✔
370
        Core.throw(TaskFailedException(t))
10✔
371
    end
372
    nothing
373
end
374

375
# Wait multiple tasks
376

377
"""
378
    waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)
379

380
Wait until at least one of the given tasks have been completed.
381

382
If `throw` is `true`, throw `CompositeException` when one of the
383
completed tasks completes with an exception.
384

385
The return value consists of two task vectors. The first one consists of
386
completed tasks, and the other consists of uncompleted tasks.
387

388
!!! warning
389
    This may scale poorly compared to writing code that uses multiple individual tasks that
390
    each runs serially, since this needs to scan the list of `tasks` each time and
391
    synchronize with each one every time this is called. Or consider using
392
    [`waitall(tasks; failfast=true)`](@ref waitall) instead.
393
"""
394
waitany(tasks; throw=true) = _wait_multiple(tasks, throw)
×
395

396
"""
397
    waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)
398

399
Wait until all the given tasks have been completed.
400

401
If `failfast` is `true`, the function will return when at least one of the
402
given tasks is finished by exception. If `throw` is `true`, throw
403
`CompositeException` when one of the completed tasks has failed.
404

405
`failfast` and `throw` keyword arguments work independently; when only
406
`throw=true` is specified, this function waits for all the tasks to complete.
407

408
The return value consists of two task vectors. The first one consists of
409
completed tasks, and the other consists of uncompleted tasks.
410
"""
411
waitall(tasks; failfast=true, throw=true) = _wait_multiple(tasks, throw, true, failfast)
×
412

413
function _wait_multiple(waiting_tasks, throwexc=false, all=false, failfast=false)
×
414
    tasks = Task[]
×
415

416
    for t in waiting_tasks
×
417
        t isa Task || error("Expected an iterator of `Task` object")
×
418
        push!(tasks, t)
×
419
    end
×
420

421
    if (all && !failfast) || length(tasks) <= 1
×
422
        exception = false
×
423
        # Force everything to finish synchronously for the case of waitall
424
        # with failfast=false
425
        for t in tasks
×
426
            _wait(t)
×
427
            exception |= istaskfailed(t)
×
428
        end
×
429
        if exception && throwexc
×
430
            exceptions = [TaskFailedException(t) for t in tasks if istaskfailed(t)]
×
431
            throw(CompositeException(exceptions))
×
432
        else
433
            return tasks, Task[]
×
434
        end
435
    end
436

437
    exception = false
×
438
    nremaining::Int = length(tasks)
×
439
    done_mask = falses(nremaining)
×
440
    for (i, t) in enumerate(tasks)
×
441
        if istaskdone(t)
×
442
            done_mask[i] = true
×
443
            exception |= istaskfailed(t)
×
444
            nremaining -= 1
×
445
        else
446
            done_mask[i] = false
×
447
        end
448
    end
×
449

450
    if nremaining == 0
×
451
        return tasks, Task[]
×
452
    elseif any(done_mask) && (!all || (failfast && exception))
×
453
        if throwexc && (!all || failfast) && exception
×
454
            exceptions = [TaskFailedException(t) for t in tasks[done_mask] if istaskfailed(t)]
×
455
            throw(CompositeException(exceptions))
×
456
        else
457
            return tasks[done_mask], tasks[.~done_mask]
×
458
        end
459
    end
460

461
    chan = Channel{Int}(Inf)
×
462
    sentinel = current_task()
×
463
    waiter_tasks = fill(sentinel, length(tasks))
×
464

465
    for (i, done) in enumerate(done_mask)
×
466
        done && continue
×
467
        t = tasks[i]
×
468
        if istaskdone(t)
×
469
            done_mask[i] = true
×
470
            exception |= istaskfailed(t)
×
471
            nremaining -= 1
×
472
            exception && failfast && break
×
473
        else
474
            waiter = @task put!(chan, i)
×
475
            waiter.sticky = false
×
476
            _wait2(t, waiter)
×
477
            waiter_tasks[i] = waiter
×
478
        end
479
    end
×
480

481
    while nremaining > 0
×
482
        i = take!(chan)
×
483
        t = tasks[i]
×
484
        waiter_tasks[i] = sentinel
×
485
        done_mask[i] = true
×
486
        exception |= istaskfailed(t)
×
487
        nremaining -= 1
×
488

489
        # stop early if requested, unless there is something immediately
490
        # ready to consume from the channel (using a race-y check)
491
        if (!all || (failfast && exception)) && !isready(chan)
×
492
            break
×
493
        end
494
    end
×
495

496
    close(chan)
×
497

498
    if nremaining == 0
×
499
        return tasks, Task[]
×
500
    else
501
        remaining_mask = .~done_mask
×
502
        for i in findall(remaining_mask)
×
503
            waiter = waiter_tasks[i]
×
504
            donenotify = tasks[i].donenotify::ThreadSynchronizer
×
505
            @lock donenotify Base.list_deletefirst!(donenotify.waitq, waiter)
×
506
        end
×
507
        done_tasks = tasks[done_mask]
×
508
        if throwexc && exception
×
509
            exceptions = [TaskFailedException(t) for t in done_tasks if istaskfailed(t)]
×
510
            throw(CompositeException(exceptions))
×
511
        else
512
            return done_tasks, tasks[remaining_mask]
×
513
        end
514
    end
515
end
516

517
"""
518
    fetch(x::Any)
519

520
Return `x`.
521
"""
522
fetch(@nospecialize x) = x
×
523

524
"""
525
    fetch(t::Task)
526

527
Wait for a [`Task`](@ref) to finish, then return its result value.
528
If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task)
529
is thrown.
530
"""
531
function fetch(t::Task)
32✔
532
    wait(t)
1,348✔
533
    return task_result(t)
1,347✔
534
end
535

536

537
## lexically-scoped waiting for multiple items
538

539
struct ScheduledAfterSyncException <: Exception
UNCOV
540
    values::Vector{Any}
×
541
end
542

UNCOV
543
function showerror(io::IO, ex::ScheduledAfterSyncException)
×
UNCOV
544
    print(io, "ScheduledAfterSyncException: ")
×
UNCOV
545
    if isempty(ex.values)
×
546
        print(io, "(no values)")
×
547
        return
×
548
    end
UNCOV
549
    show(io, ex.values[1])
×
UNCOV
550
    if length(ex.values) == 1
×
UNCOV
551
        print(io, " is")
×
UNCOV
552
    elseif length(ex.values) == 2
×
UNCOV
553
        print(io, " and one more ")
×
UNCOV
554
        print(io, nameof(typeof(ex.values[2])))
×
UNCOV
555
        print(io, " are")
×
556
    else
UNCOV
557
        print(io, " and ", length(ex.values) - 1, " more objects are")
×
558
    end
UNCOV
559
    print(io, " registered after the end of a `@sync` block")
×
560
end
561

562
function sync_end(c::Channel{Any})
153✔
563
    local c_ex
×
564
    while isready(c)
147,815✔
565
        r = take!(c)
147,662✔
566
        if isa(r, Task)
147,662✔
567
            _wait(r)
147,644✔
568
            if istaskfailed(r)
147,644✔
569
                if !@isdefined(c_ex)
×
570
                    c_ex = CompositeException()
×
571
                end
572
                push!(c_ex, TaskFailedException(r))
×
573
            end
574
        else
575
            try
18✔
576
                wait(r)
18✔
577
            catch e
578
                if !@isdefined(c_ex)
×
579
                    c_ex = CompositeException()
×
580
                end
581
                push!(c_ex, e)
×
582
            end
583
        end
584
    end
147,662✔
585
    close(c)
153✔
586

587
    # Capture all waitable objects scheduled after the end of `@sync` and
588
    # include them in the exception. This way, the user can check what was
589
    # scheduled by examining at the exception object.
590
    if isready(c)
153✔
591
        local racy
592
        for r in c
×
593
            if !@isdefined(racy)
×
594
                racy = []
×
595
            end
596
            push!(racy, r)
×
597
        end
×
598
        if @isdefined(racy)
×
599
            if !@isdefined(c_ex)
×
600
                c_ex = CompositeException()
×
601
            end
602
            # Since this is a clear programming error, show this exception first:
603
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
×
604
        end
605
    end
606

607
    if @isdefined(c_ex)
153✔
608
        throw(c_ex)
×
609
    end
610
    nothing
611
end
612

613
const sync_varname = gensym(:sync)
614

615
"""
616
    @sync
617

618
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn),
619
`Distributed.@spawnat` and `Distributed.@distributed`
620
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
621
a [`CompositeException`](@ref).
622

623
# Examples
624
```julia-repl
625
julia> Threads.nthreads()
626
4
627

628
julia> @sync begin
629
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
630
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
631
       end;
632
Thread-id 3, task 1
633
Thread-id 1, task 2
634
```
635
"""
636
macro sync(block)
11✔
637
    var = esc(sync_varname)
11✔
638
    quote
11✔
639
        let $var = Channel(Inf)
162✔
640
            v = $(esc(block))
42,228✔
641
            sync_end($var)
162✔
642
            v
101✔
643
        end
644
    end
645
end
646

647
# schedule an expression to run asynchronously
648

649
"""
650
    @async
651

652
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
653

654
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
655
constructed underlying closure. This allows you to insert the _value_ of a variable,
656
isolating the asynchronous code from changes to the variable's value in the current task.
657

658
!!! warning
659
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
660
    parallelism is required** especially in publicly distributed libraries.  This is
661
    because a use of `@async` disables the migration of the *parent* task across worker
662
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
663
    `@async` in a library function can have a large impact on the performance of very
664
    different parts of user applications.
665

666
!!! compat "Julia 1.4"
667
    Interpolating values via `\$` is available as of Julia 1.4.
668
"""
669
macro async(expr)
159✔
670
    do_async_macro(expr, __source__)
159✔
671
end
672

673
# generate the code for @async, possibly wrapping the task in something before
674
# pushing it to the wait queue.
675
function do_async_macro(expr, linenums; wrap=identity)
308✔
676
    letargs = Base._lift_one_interp!(expr)
154✔
677

678
    thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums)
154✔
679
    var = esc(sync_varname)
154✔
680
    quote
154✔
681
        let $(letargs...)
13✔
682
            local task = Task($thunk)
87,365✔
683
            if $(Expr(:islocal, var))
86,407✔
684
                put!($var, $(wrap(:task)))
81,379✔
685
            end
686
            schedule(task)
87,365✔
687
            task
86,431✔
688
        end
689
    end
690
end
691

692
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
693
struct UnwrapTaskFailedException <: Exception
694
    task::Task
18✔
695
end
696

697
# common code for wait&fetch for UnwrapTaskFailedException
698
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
18✔
699
    try
18✔
700
        f(t.task)
18✔
701
    catch ex
702
        if ex isa TaskFailedException
×
703
            throw(ex.task.exception)
×
704
        else
705
            rethrow()
×
706
        end
707
    end
708
end
709

710
# the unwrapping for above task wrapper (gets triggered in sync_end())
711
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
18✔
712

713
# same for fetching the tasks, for convenience
714
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
715

716
# macro for running async code that doesn't throw wrapped exceptions
717
macro async_unwrap(expr)
718
    do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task)))
×
719
end
720

721
"""
722
    errormonitor(t::Task)
723

724
Print an error log to `stderr` if task `t` fails.
725

726
# Examples
727
```julia-repl
728
julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
729
Unhandled Task ERROR: task failed
730
Stacktrace:
731
[...]
732
```
733
"""
734
function errormonitor(t::Task)
2✔
735
    t2 = Task() do
3,008✔
736
        if istaskfailed(t)
2,427✔
737
            local errs = stderr
×
738
            try # try to display the failure atomically
×
739
                errio = IOContext(PipeBuffer(), errs::IO)
×
740
                emphasize(errio, "Unhandled Task ")
×
741
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
742
                write(errs, errio)
×
743
            catch
744
                try # try to display the secondary error atomically
×
745
                    errio = IOContext(PipeBuffer(), errs::IO)
×
746
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
747
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
748
                    write(errs, errio)
×
749
                    flush(errs)
×
750
                    # and then the actual error, as best we can
751
                    Core.print(Core.stderr, "while handling: ")
×
752
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
753
                catch e
754
                    # give up
755
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
756
                            " while trying to print a failed Task notice; giving up\n")
757
                end
758
            end
759
        end
760
        nothing
761
    end
762
    t2.sticky = false
3,008✔
763
    _wait2(t, t2)
3,008✔
764
    return t
2✔
765
end
766

767
# Capture interpolated variables in $() and move them to let-block
768
function _lift_one_interp!(e)
12✔
769
    letargs = Any[]  # store the new gensymed arguments
166✔
770
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
166✔
771
    letargs
4✔
772
end
773
_lift_one_interp_helper(v, _, _) = v
4✔
774
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
811✔
775
    if expr.head === :$
811✔
776
        if in_quote_context  # This $ is simply interpolating out of the quote
×
777
            # Now, we're out of the quote, so any _further_ $ is ours.
778
            in_quote_context = false
×
779
        else
780
            newarg = gensym()
×
781
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
×
782
            return newarg  # Don't recurse into the lifted $() exprs
×
783
        end
784
    elseif expr.head === :quote
811✔
785
        in_quote_context = true   # Don't try to lift $ directly out of quotes
×
786
    elseif expr.head === :macrocall
811✔
787
        return expr  # Don't recur into macro calls, since some other macros use $
45✔
788
    end
789
    for (i,e) in enumerate(expr.args)
766✔
790
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
1,971✔
791
    end
1,971✔
792
    expr
×
793
end
794

795

796
# add a wait-able object to the sync pool
797
macro sync_add(expr)
798
    var = esc(sync_varname)
799
    quote
800
        local ref = $(esc(expr))
801
        put!($var, ref)
802
        ref
803
    end
804
end
805

806
function repl_backend_task()
807
    @isdefined(active_repl_backend) || return
×
808
    backend = active_repl_backend
×
809
    isdefined(backend, :backend_task) || return
×
810
    backend_task = getfield(active_repl_backend, :backend_task)::Task
×
811
    if backend_task._state === task_state_runnable && getfield(backend, :in_eval)
×
812
        return backend_task
×
813
    end
814
    return
×
815
end
816

817
# runtime system hook called when a task finishes
818
function task_done_hook(t::Task)
160,936✔
819
    # `finish_task` sets `sigatomic` before entering this function
820
    err = istaskfailed(t)
160,936✔
821
    result = task_result(t)
160,936✔
822
    handled = false
×
823

824
    donenotify = t.donenotify
160,936✔
825
    if isa(donenotify, ThreadSynchronizer)
160,936✔
826
        lock(donenotify)
160,866✔
827
        try
160,866✔
828
            if !isempty(donenotify.waitq)
160,866✔
829
                handled = true
5,338✔
830
                notify(donenotify)
5,338✔
831
            end
832
        finally
833
            unlock(donenotify)
160,866✔
834
        end
835
    end
836

837
    if err && !handled && Threads.threadid() == 1
160,936✔
838
        if isa(result, InterruptException) && isempty(Workqueue)
5✔
839
            backend = repl_backend_task()
×
840
            backend isa Task && throwto(backend, result)
×
841
        end
842
    end
843
    # Clear sigatomic before waiting
844
    sigatomic_end()
160,936✔
845
    try
160,936✔
846
        wait() # this will not return
160,936✔
847
    catch e
848
        # If an InterruptException happens while blocked in the event loop, try handing
849
        # the exception to the REPL task since the current task is done.
850
        # issue #19467
851
        if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue)
×
852
            backend = repl_backend_task()
×
853
            backend isa Task && throwto(backend, e)
×
854
        end
855
        rethrow() # this will terminate the program
×
856
    end
857
end
858

859

860
## scheduler and work queue
861

862
mutable struct IntrusiveLinkedListSynchronized{T}
863
    queue::IntrusiveLinkedList{T}
864
    lock::Threads.SpinLock
865
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
70✔
866
end
867
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
2,403,947✔
868
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
869
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
1,693,733✔
870
    lock(W.lock)
1,693,733✔
871
    try
1,693,733✔
872
        push!(W.queue, t)
2,117,245✔
873
    finally
874
        unlock(W.lock)
1,693,733✔
875
    end
876
    return W
1,693,733✔
877
end
878
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
7✔
879
    lock(W.lock)
7✔
880
    try
7✔
881
        pushfirst!(W.queue, t)
14✔
882
    finally
883
        unlock(W.lock)
7✔
884
    end
885
    return W
7✔
886
end
887
function pop!(W::IntrusiveLinkedListSynchronized)
×
888
    lock(W.lock)
×
889
    try
×
890
        return pop!(W.queue)
×
891
    finally
892
        unlock(W.lock)
×
893
    end
894
end
895
function popfirst!(W::IntrusiveLinkedListSynchronized)
1,693,620✔
896
    lock(W.lock)
1,693,620✔
897
    try
1,693,620✔
898
        return popfirst!(W.queue)
2,117,122✔
899
    finally
900
        unlock(W.lock)
1,693,620✔
901
    end
902
end
903
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
7✔
904
    lock(W.lock)
7✔
905
    try
7✔
906
        list_deletefirst!(W.queue, t)
7✔
907
    finally
908
        unlock(W.lock)
7✔
909
    end
910
    return W
7✔
911
end
912

913
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
914
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
915
const Workqueues_lock = Threads.SpinLock()
916
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
917

918
function workqueue_for(tid::Int)
3,387,633✔
919
    qs = Workqueues
3,387,633✔
920
    if length(qs) >= tid && isassigned(qs, tid)
3,387,633✔
921
        return @inbounds qs[tid]
3,387,563✔
922
    end
923
    # slow path to allocate it
924
    @assert tid > 0
70✔
925
    l = Workqueues_lock
×
926
    @lock l begin
70✔
927
        qs = Workqueues
70✔
928
        if length(qs) < tid
70✔
929
            nt = Threads.maxthreadid()
70✔
930
            @assert tid <= nt
70✔
931
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
140✔
932
        end
933
        if !isassigned(qs, tid)
70✔
934
            @inbounds qs[tid] = StickyWorkqueue()
70✔
935
        end
936
        return @inbounds qs[tid]
70✔
937
    end
938
end
939

940
function enq_work(t::Task)
1,693,956✔
941
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
1,693,956✔
942

943
    # Sticky tasks go into their thread's work queue.
944
    if t.sticky
1,693,956✔
945
        tid = Threads.threadid(t)
1,681,787✔
946
        if tid == 0
1,681,787✔
947
            # The task is not yet stuck to a thread. Stick it to the current
948
            # thread and do the same to the parent task (the current task) so
949
            # that the tasks are correctly co-scheduled (issue #41324).
950
            # XXX: Ideally we would be able to unset this.
951
            if GC.in_finalizer()
153,769✔
952
                # The task was launched in a finalizer. There is no thread to sticky it
953
                # to, so just allow it to run anywhere as if it had been non-sticky.
954
                t.sticky = false
14✔
955
                @goto not_sticky
14✔
956
            else
957
                tid = Threads.threadid()
153,755✔
958
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
153,755✔
959
                current_task().sticky = true
153,755✔
960
            end
961
        end
962
        push!(workqueue_for(tid), t)
1,681,773✔
963
    else
964
        @label not_sticky
965
        tp = Threads.threadpool(t)
24,360✔
966
        if tp === :foreign || Threads.threadpoolsize(tp) == 1
36,543✔
967
            # There's only one thread in the task's assigned thread pool;
968
            # use its work queue.
969
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
23,914✔
970
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
11,960✔
971
            push!(workqueue_for(tid), t)
11,960✔
972
        else
973
            # Otherwise, put the task in the multiqueue.
974
            Partr.multiq_insert(t, t.priority)
223✔
975
            tid = 0
×
976
        end
977
    end
978
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
1,693,956✔
979
    return t
1,693,956✔
980
end
981

982
schedule(t::Task) = enq_work(t)
157,262✔
983

984
"""
985
    schedule(t::Task, [val]; error=false)
986

987
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
988
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
989

990
If a second argument `val` is provided, it will be passed to the task (via the return value of
991
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
992
the woken task.
993

994
!!! warning
995
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
996
    See [the API reference](@ref low-level-schedule-wait) for more information.
997

998
!!! warning
999
    By default tasks will have the sticky bit set to true `t.sticky`. This models the
1000
    historic default for [`@async`](@ref). Sticky tasks can only be run on the worker thread
1001
    they are first scheduled on, and when scheduled will make the task that they were scheduled
1002
    from sticky. To obtain the behavior of [`Threads.@spawn`](@ref) set the sticky
1003
    bit manually to `false`.
1004

1005
# Examples
1006
```jldoctest
1007
julia> a5() = sum(i for i in 1:1000);
1008

1009
julia> b = Task(a5);
1010

1011
julia> istaskstarted(b)
1012
false
1013

1014
julia> schedule(b);
1015

1016
julia> yield();
1017

1018
julia> istaskstarted(b)
1019
true
1020

1021
julia> istaskdone(b)
1022
true
1023
```
1024
"""
1025
function schedule(t::Task, @nospecialize(arg); error=false)
959,974✔
1026
    # schedule a task to be (re)started with the given value or exception
1027
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
480,282✔
1028
    if error
480,280✔
1029
        q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t)
1,371✔
1030
        setfield!(t, :result, arg)
1,369✔
1031
        setfield!(t, :_isexception, true)
1,369✔
1032
    else
1033
        t.queue === nothing || Base.error("schedule: Task not runnable")
478,911✔
1034
        setfield!(t, :result, arg)
478,911✔
1035
    end
1036
    enq_work(t)
480,280✔
1037
    return t
480,280✔
1038
end
1039

1040
"""
1041
    yield()
1042

1043
Switch to the scheduler to allow another scheduled task to run. A task that calls this
1044
function is still runnable, and will be restarted immediately if there are no other runnable
1045
tasks.
1046
"""
1047
function yield()
1,056,805✔
1048
    ct = current_task()
1,056,805✔
1049
    enq_work(ct)
1,056,805✔
1050
    try
1,056,805✔
1051
        wait()
1,056,805✔
1052
    catch
1053
        q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
5✔
1054
        rethrow()
5✔
1055
    end
1056
end
1057

1058
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
1,694,115✔
1059

1060
"""
1061
    yield(t::Task, arg = nothing)
1062

1063
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
1064
immediately yields to `t` before calling the scheduler.
1065

1066
Throws a `ConcurrencyViolationError` if `t` is the currently running task.
1067
"""
1068
function yield(t::Task, @nospecialize(x=nothing))
280✔
1069
    current = current_task()
536✔
1070
    t === current && throw(ConcurrencyViolationError("Cannot yield to currently running task!"))
268✔
1071
    (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable"))
268✔
1072
    t.result = x
268✔
1073
    enq_work(current)
268✔
1074
    set_next_task(t)
268✔
1075
    return try_yieldto(ensure_rescheduled)
268✔
1076
end
1077

1078
"""
1079
    yieldto(t::Task, arg = nothing)
1080

1081
Switch to the given task. The first time a task is switched to, the task's function is
1082
called with no arguments. On subsequent switches, `arg` is returned from the task's last
1083
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
1084
or scheduling in any way. Its use is discouraged.
1085
"""
1086
function yieldto(t::Task, @nospecialize(x=nothing))
4✔
1087
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
1088
    # state error instead.
1089
    if t._state === task_state_done
9✔
1090
        return x
×
1091
    elseif t._state === task_state_failed
5✔
1092
        throw(t.result)
×
1093
    end
1094
    t.result = x
5✔
1095
    set_next_task(t)
5✔
1096
    return try_yieldto(identity)
5✔
1097
end
1098

1099
function try_yieldto(undo)
1,694,115✔
1100
    try
1,694,115✔
1101
        ccall(:jl_switch, Cvoid, ())
1,694,115✔
1102
    catch
1103
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
8✔
1104
        rethrow()
8✔
1105
    end
1106
    ct = current_task()
1,532,964✔
1107
    if ct._isexception
1,532,964✔
1108
        exc = ct.result
1,371✔
1109
        ct.result = nothing
1,371✔
1110
        ct._isexception = false
1,371✔
1111
        throw(exc)
1,371✔
1112
    end
1113
    result = ct.result
1,531,593✔
1114
    ct.result = nothing
1,531,593✔
1115
    return result
1,531,593✔
1116
end
1117

1118
# yield to a task, throwing an exception in it
1119
function throwto(t::Task, @nospecialize exc)
1120
    t.result = exc
4✔
1121
    t._isexception = true
4✔
1122
    set_next_task(t)
4✔
1123
    return try_yieldto(identity)
4✔
1124
end
1125

1126
function ensure_rescheduled(othertask::Task)
7✔
1127
    ct = current_task()
7✔
1128
    W = workqueue_for(Threads.threadid())
7✔
1129
    if ct !== othertask && othertask._state === task_state_runnable
7✔
1130
        # we failed to yield to othertask
1131
        # return it to the head of a queue to be retried later
1132
        tid = Threads.threadid(othertask)
7✔
1133
        Wother = tid == 0 ? W : workqueue_for(tid)
14✔
1134
        pushfirst!(Wother, othertask)
7✔
1135
    end
1136
    # if the current task was queued,
1137
    # also need to return it to the runnable state
1138
    # before throwing an error
1139
    list_deletefirst!(W, ct)
7✔
1140
    nothing
1141
end
1142

1143
function trypoptask(W::StickyWorkqueue)
2,403,945✔
1144
    while !isempty(W)
2,403,946✔
1145
        t = popfirst!(W)
1,693,620✔
1146
        if t._state !== task_state_runnable
1,693,620✔
1147
            # assume this somehow got queued twice,
1148
            # probably broken now, but try discarding this switch and keep going
1149
            # can't throw here, because it's probably not the fault of the caller to wait
1150
            # and don't want to use print() here, because that may try to incur a task switch
1151
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
1152
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
1153
            continue
×
1154
        end
1155
        return t
1,693,619✔
1156
    end
1✔
1157
    return Partr.multiq_deletemin()
710,326✔
1158
end
1159

1160
checktaskempty = Partr.multiq_check_empty
1161

1162
@noinline function poptask(W::StickyWorkqueue)
1,693,886✔
1163
    task = trypoptask(W)
1,693,886✔
1164
    if !(task isa Task)
1,693,886✔
1165
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
206,762✔
1166
    end
1167
    set_next_task(task)
1,693,838✔
1168
    nothing
1169
end
1170

1171
function wait()
1,693,886✔
1172
    GC.safepoint()
1,693,886✔
1173
    W = workqueue_for(Threads.threadid())
1,693,886✔
1174
    poptask(W)
1,693,886✔
1175
    result = try_yieldto(ensure_rescheduled)
1,693,838✔
1176
    process_events()
1,531,322✔
1177
    # return when we come out of the queue
1178
    return result
1,531,322✔
1179
end
1180

1181
if Sys.iswindows()
1182
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
1183
else
1184
    pause() = ccall(:pause, Cvoid, ())
×
1185
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc