• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37755

20 Apr 2024 11:40PM UTC coverage: 87.308% (-0.09%) from 87.396%
#37755

push

local

web-flow
make `one(::AbstractMatrix)` use `similar` instead of `zeros` (#54162)

13 of 13 new or added lines in 1 file covered. (100.0%)

117 existing lines in 16 files now uncovered.

76156 of 87227 relevant lines covered (87.31%)

14990985.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.73
/base/task.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
## basic task functions and TLS
4

5
Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
329,270✔
6

7
# Container for a captured exception and its backtrace. Can be serialized.
8
struct CapturedException <: Exception
9
    ex::Any
10
    processed_bt::Vector{Any}
11

12
    function CapturedException(ex, bt_raw::Vector)
1✔
13
        # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14
        # Typically the result of a catch_backtrace()
15

16
        # Process bt_raw so that it can be safely serialized
17
        bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
2✔
18
        CapturedException(ex, bt_lines)
2✔
19
    end
20

21
    CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
2✔
22
end
23

24
function showerror(io::IO, ce::CapturedException)
×
25
    showerror(io, ce.ex, ce.processed_bt, backtrace=true)
×
26
end
27

28
"""
29
    capture_exception(ex, bt) -> Exception
30

31
Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref).
32

33
Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during
34
the user-supplied function call.
35
"""
36
capture_exception(ex, bt) = CapturedException(ex, bt)
1✔
37

38
"""
39
    CompositeException
40

41
Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
42
or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
43
For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
44
contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
45
"""
46
struct CompositeException <: Exception
47
    exceptions::Vector{Any}
48
    CompositeException() = new(Any[])
×
49
    CompositeException(exceptions) = new(exceptions)
×
50
end
51
length(c::CompositeException) = length(c.exceptions)
×
52
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
×
53
pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex)
×
54
isempty(c::CompositeException) = isempty(c.exceptions)
×
55
iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
×
56
eltype(::Type{CompositeException}) = Any
×
57

58
function showerror(io::IO, ex::CompositeException)
×
59
    if !isempty(ex)
×
60
        showerror(io, ex.exceptions[1])
×
61
        remaining = length(ex) - 1
×
62
        if remaining > 0
×
63
            print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n")
×
64
        end
65
    else
66
        print(io, "CompositeException()\n")
×
67
    end
68
end
69

70
"""
71
    TaskFailedException
72

73
This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails.
74
`TaskFailedException` wraps the failed task `t`.
75
"""
76
struct TaskFailedException <: Exception
77
    task::Task
125✔
78
end
79

80
function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true)
5✔
81
    print(io, "TaskFailedException")
2✔
82
    if bt !== nothing && backtrace
2✔
83
        show_backtrace(io, bt)
1✔
84
    end
85
    println(io)
2✔
86
    printstyled(io, "\n    nested task error: ", color=error_color())
4✔
87
    show_task_exception(io, ex.task)
2✔
88
end
89

90
function show_task_exception(io::IO, t::Task; indent = true)
4✔
91
    stack = current_exceptions(t)
2✔
92
    b = IOBuffer()
2✔
93
    if isempty(stack)
2✔
94
        # exception stack buffer not available; probably a serialized task
95
        showerror(IOContext(b, io), t.result)
×
96
    else
97
        show_exception_stack(IOContext(b, io), stack)
2✔
98
    end
99
    str = String(take!(b))
4✔
100
    if indent
2✔
101
        str = replace(str, "\n" => "\n    ")
2✔
102
    end
103
    print(io, str)
2✔
104
end
105

106
function show(io::IO, t::Task)
×
107
    state = t.state
×
108
    state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "")
×
109
    print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
×
110
end
111

112
"""
113
    @task
114

115
Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
116
creates a task, and does not run it.
117

118
# Examples
119
```jldoctest
120
julia> a1() = sum(i for i in 1:1000);
121

122
julia> b = @task a1();
123

124
julia> istaskstarted(b)
125
false
126

127
julia> schedule(b);
128

129
julia> yield();
130

131
julia> istaskdone(b)
132
true
133
```
134
"""
135
macro task(ex)
19✔
136
    thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__)
19✔
137
    :(Task($thunk))
19✔
138
end
139

140
"""
141
    current_task()
142

143
Get the currently running [`Task`](@ref).
144
"""
145
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
424,574,780✔
146

147
# task states
148

149
const task_state_runnable = UInt8(0)
150
const task_state_done     = UInt8(1)
151
const task_state_failed   = UInt8(2)
152

153
const _state_index = findfirst(==(:_state), fieldnames(Task))
154
@eval function load_state_acquire(t)
7✔
155
    # TODO: Replace this by proper atomic operations when available
156
    @GC.preserve t llvmcall($("""
487,381✔
157
        %rv = load atomic i8, i8* %0 acquire, align 8
158
        ret i8 %rv
159
        """), UInt8, Tuple{Ptr{UInt8}},
160
        Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index)))
161
end
162

163
@inline function getproperty(t::Task, field::Symbol)
1,401✔
164
    if field === :state
4,979✔
165
        # TODO: this field name should be deprecated in 2.0
166
        st = load_state_acquire(t)
8✔
167
        if st === task_state_runnable
8✔
168
            return :runnable
×
169
        elseif st === task_state_done
8✔
170
            return :done
5✔
171
        elseif st === task_state_failed
3✔
172
            return :failed
3✔
173
        else
174
            @assert false
×
175
        end
176
    elseif field === :backtrace
4,971✔
177
        # TODO: this field name should be deprecated in 2.0
178
        return current_exceptions(t)[end][2]
×
179
    elseif field === :exception
4,971✔
180
        # TODO: this field name should be deprecated in 2.0
181
        return t._isexception ? t.result : nothing
5✔
182
    elseif field === :scope
4,966✔
183
        error("Querying `scope` is disallowed. Use `current_scope` instead.")
×
184
    else
185
        return getfield(t, field)
218,923,336✔
186
    end
187
end
188

189
@inline function setproperty!(t::Task, field::Symbol, @nospecialize(v))
2,771✔
190
    if field === :scope
276,125,282✔
191
        istaskstarted(t) && error("Setting scope on a started task directly is disallowed.")
×
192
    end
193
    return @invoke setproperty!(t::Any, field::Symbol, v::Any)
283,205,554✔
194
end
195

196
"""
197
    istaskdone(t::Task) -> Bool
198

199
Determine whether a task has exited.
200

201
# Examples
202
```jldoctest
203
julia> a2() = sum(i for i in 1:1000);
204

205
julia> b = Task(a2);
206

207
julia> istaskdone(b)
208
false
209

210
julia> schedule(b);
211

212
julia> yield();
213

214
julia> istaskdone(b)
215
true
216
```
217
"""
218
istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable
169,639✔
219

220
"""
221
    istaskstarted(t::Task) -> Bool
222

223
Determine whether a task has started executing.
224

225
# Examples
226
```jldoctest
227
julia> a3() = sum(i for i in 1:1000);
228

229
julia> b = Task(a3);
230

231
julia> istaskstarted(b)
232
false
233
```
234
"""
235
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
3✔
236

237
"""
238
    istaskfailed(t::Task) -> Bool
239

240
Determine whether a task has exited because an exception was thrown.
241

242
# Examples
243
```jldoctest
244
julia> a4() = error("task failed");
245

246
julia> b = Task(a4);
247

248
julia> istaskfailed(b)
249
false
250

251
julia> schedule(b);
252

253
julia> yield();
254

255
julia> istaskfailed(b)
256
true
257
```
258

259
!!! compat "Julia 1.3"
260
    This function requires at least Julia 1.3.
261
"""
262
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)
317,727✔
263

264
Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
1,634,602✔
265
function Threads.threadpool(t::Task)
1✔
266
    tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
6,163✔
267
    return Threads._tpid_to_sym(tpid)
12,317✔
268
end
269

270
task_result(t::Task) = t.result
164,301✔
271

272
task_local_storage() = get_task_tls(current_task())
68,203,556✔
273
function get_task_tls(t::Task)
274
    if t.storage === nothing
68,203,556✔
275
        t.storage = IdDict()
677✔
276
    end
277
    return (t.storage)::IdDict{Any,Any}
68,203,556✔
278
end
279

280
"""
281
    task_local_storage(key)
282

283
Look up the value of a key in the current task's task-local storage.
284
"""
285
task_local_storage(key) = task_local_storage()[key]
×
286

287
"""
288
    task_local_storage(key, value)
289

290
Assign a value to a key in the current task's task-local storage.
291
"""
292
task_local_storage(key, val) = (task_local_storage()[key] = val)
1✔
293

294
"""
295
    task_local_storage(body, key, value)
296

297
Call the function `body` with a modified task-local storage, in which `value` is assigned to
298
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
299
for emulating dynamic scoping.
300
"""
301
function task_local_storage(body::Function, key, val)
×
302
    tls = task_local_storage()
×
303
    hadkey = haskey(tls, key)
×
304
    old = get(tls, key, nothing)
×
305
    tls[key] = val
×
306
    try
×
307
        return body()
×
308
    finally
309
        hadkey ? (tls[key] = old) : delete!(tls, key)
×
310
    end
311
end
312

313
# just wait for a task to be done, no error propagation
314
function _wait(t::Task)
153,506✔
315
    if !istaskdone(t)
153,506✔
316
        donenotify = t.donenotify::ThreadSynchronizer
3,647✔
317
        lock(donenotify)
3,647✔
318
        try
3,647✔
319
            while !istaskdone(t)
7,294✔
320
                wait(donenotify)
3,647✔
321
            end
3,647✔
322
        finally
323
            unlock(donenotify)
3,647✔
324
        end
325
    end
326
    nothing
327
end
328

329
# have `waiter` wait for `t`
330
function _wait2(t::Task, waiter::Task)
3,043✔
331
    if !istaskdone(t)
3,043✔
332
        # since _wait2 is similar to schedule, we should observe the sticky
333
        # bit, even if we don't call `schedule` with early-return below
334
        if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer()
3,043✔
335
            # Issue #41324
336
            # t.sticky && tid == 0 is a task that needs to be co-scheduled with
337
            # the parent task. If the parent (current_task) is not sticky we must
338
            # set it to be sticky.
339
            # XXX: Ideally we would be able to unset this
340
            current_task().sticky = true
24✔
341
            tid = Threads.threadid()
24✔
342
            ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
24✔
343
        end
344
        donenotify = t.donenotify::ThreadSynchronizer
3,043✔
345
        lock(donenotify)
3,043✔
346
        if !istaskdone(t)
3,043✔
347
            push!(donenotify.waitq, waiter)
3,088✔
348
            unlock(donenotify)
3,043✔
349
            return nothing
3,043✔
350
        else
351
            unlock(donenotify)
×
352
        end
353
    end
354
    schedule(waiter)
×
355
    nothing
356
end
357

358
"""
359
    wait(t::Task; throw=true)
360

361
Wait for a `Task` to finish.
362

363
The keyword `throw` (defaults to `true`) controls whether a failed task results
364
in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed task.
365

366
Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks.
367
"""
368
function wait(t::Task; throw=true)
11,383✔
369
    t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
5,552✔
370
    _wait(t)
5,552✔
371
    if throw && istaskfailed(t)
5,552✔
372
        Core.throw(TaskFailedException(t))
12✔
373
    end
374
    nothing
375
end
376

377
# Wait multiple tasks
378

379
"""
380
    waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)
381

382
Wait until at least one of the given tasks have been completed.
383

384
If `throw` is `true`, throw `CompositeException` when one of the
385
completed tasks completes with an exception.
386

387
The return value consists of two task vectors. The first one consists of
388
completed tasks, and the other consists of uncompleted tasks.
389

390
!!! warning
391
    This may scale poorly compared to writing code that uses multiple individual tasks that
392
    each runs serially, since this needs to scan the list of `tasks` each time and
393
    synchronize with each one every time this is called. Or consider using
394
    [`waitall(tasks; failfast=true)`](@ref waitall) instead.
395
"""
396
waitany(tasks; throw=true) = _wait_multiple(tasks, throw)
×
397

398
"""
399
    waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)
400

401
Wait until all the given tasks have been completed.
402

403
If `failfast` is `true`, the function will return when at least one of the
404
given tasks is finished by exception. If `throw` is `true`, throw
405
`CompositeException` when one of the completed tasks has failed.
406

407
`failfast` and `throw` keyword arguments work independently; when only
408
`throw=true` is specified, this function waits for all the tasks to complete.
409

410
The return value consists of two task vectors. The first one consists of
411
completed tasks, and the other consists of uncompleted tasks.
412
"""
413
waitall(tasks; failfast=true, throw=true) = _wait_multiple(tasks, throw, true, failfast)
×
414

415
function _wait_multiple(waiting_tasks, throwexc=false, all=false, failfast=false)
×
416
    tasks = Task[]
×
417

418
    for t in waiting_tasks
×
419
        t isa Task || error("Expected an iterator of `Task` object")
×
420
        push!(tasks, t)
×
421
    end
×
422

423
    if (all && !failfast) || length(tasks) <= 1
×
424
        exception = false
×
425
        # Force everything to finish synchronously for the case of waitall
426
        # with failfast=false
427
        for t in tasks
×
428
            _wait(t)
×
429
            exception |= istaskfailed(t)
×
430
        end
×
431
        if exception && throwexc
×
432
            exceptions = [TaskFailedException(t) for t in tasks if istaskfailed(t)]
×
433
            throw(CompositeException(exceptions))
×
434
        else
435
            return tasks, Task[]
×
436
        end
437
    end
438

439
    exception = false
×
440
    nremaining::Int = length(tasks)
×
441
    done_mask = falses(nremaining)
×
442
    for (i, t) in enumerate(tasks)
×
443
        if istaskdone(t)
×
444
            done_mask[i] = true
×
445
            exception |= istaskfailed(t)
×
446
            nremaining -= 1
×
447
        else
448
            done_mask[i] = false
×
449
        end
450
    end
×
451

452
    if nremaining == 0
×
453
        return tasks, Task[]
×
454
    elseif any(done_mask) && (!all || (failfast && exception))
×
455
        if throwexc && (!all || failfast) && exception
×
456
            exceptions = [TaskFailedException(t) for t in tasks[done_mask] if istaskfailed(t)]
×
457
            throw(CompositeException(exceptions))
×
458
        else
459
            return tasks[done_mask], tasks[.~done_mask]
×
460
        end
461
    end
462

463
    chan = Channel{Int}(Inf)
×
464
    sentinel = current_task()
×
465
    waiter_tasks = fill(sentinel, length(tasks))
×
466

467
    for (i, done) in enumerate(done_mask)
×
468
        done && continue
×
469
        t = tasks[i]
×
470
        if istaskdone(t)
×
471
            done_mask[i] = true
×
472
            exception |= istaskfailed(t)
×
473
            nremaining -= 1
×
474
            exception && failfast && break
×
475
        else
476
            waiter = @task put!(chan, i)
×
477
            waiter.sticky = false
×
478
            _wait2(t, waiter)
×
479
            waiter_tasks[i] = waiter
×
480
        end
481
    end
×
482

483
    while nremaining > 0
×
484
        i = take!(chan)
×
485
        t = tasks[i]
×
486
        waiter_tasks[i] = sentinel
×
487
        done_mask[i] = true
×
488
        exception |= istaskfailed(t)
×
489
        nremaining -= 1
×
490

491
        # stop early if requested, unless there is something immediately
492
        # ready to consume from the channel (using a race-y check)
493
        if (!all || (failfast && exception)) && !isready(chan)
×
494
            break
×
495
        end
496
    end
×
497

498
    close(chan)
×
499

500
    if nremaining == 0
×
501
        return tasks, Task[]
×
502
    else
503
        remaining_mask = .~done_mask
×
504
        for i in findall(remaining_mask)
×
505
            waiter = waiter_tasks[i]
×
506
            donenotify = tasks[i].donenotify::ThreadSynchronizer
×
507
            @lock donenotify Base.list_deletefirst!(donenotify.waitq, waiter)
×
508
        end
×
509
        done_tasks = tasks[done_mask]
×
510
        if throwexc && exception
×
511
            exceptions = [TaskFailedException(t) for t in done_tasks if istaskfailed(t)]
×
512
            throw(CompositeException(exceptions))
×
513
        else
514
            return done_tasks, tasks[remaining_mask]
×
515
        end
516
    end
517
end
518

519
"""
520
    fetch(x::Any)
521

522
Return `x`.
523
"""
524
fetch(@nospecialize x) = x
×
525

526
"""
527
    fetch(t::Task)
528

529
Wait for a [`Task`](@ref) to finish, then return its result value.
530
If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task)
531
is thrown.
532
"""
533
function fetch(t::Task)
52✔
534
    wait(t)
2,501✔
535
    return task_result(t)
2,500✔
536
end
537

538

539
## lexically-scoped waiting for multiple items
540

541
struct ScheduledAfterSyncException <: Exception
542
    values::Vector{Any}
3✔
543
end
544

545
function showerror(io::IO, ex::ScheduledAfterSyncException)
3✔
546
    print(io, "ScheduledAfterSyncException: ")
3✔
547
    if isempty(ex.values)
3✔
548
        print(io, "(no values)")
×
549
        return
×
550
    end
551
    show(io, ex.values[1])
3✔
552
    if length(ex.values) == 1
3✔
553
        print(io, " is")
1✔
554
    elseif length(ex.values) == 2
2✔
555
        print(io, " and one more ")
1✔
556
        print(io, nameof(typeof(ex.values[2])))
1✔
557
        print(io, " are")
1✔
558
    else
559
        print(io, " and ", length(ex.values) - 1, " more objects are")
1✔
560
    end
561
    print(io, " registered after the end of a `@sync` block")
3✔
562
end
563

564
function sync_end(c::Channel{Any})
187✔
565
    local c_ex
×
566
    while isready(c)
147,930✔
567
        r = take!(c)
147,743✔
568
        if isa(r, Task)
147,743✔
569
            _wait(r)
147,719✔
570
            if istaskfailed(r)
147,719✔
571
                if !@isdefined(c_ex)
×
572
                    c_ex = CompositeException()
×
573
                end
574
                push!(c_ex, TaskFailedException(r))
×
575
            end
576
        else
577
            try
24✔
578
                wait(r)
24✔
579
            catch e
580
                if !@isdefined(c_ex)
×
581
                    c_ex = CompositeException()
×
582
                end
583
                push!(c_ex, e)
×
584
            end
585
        end
586
    end
147,743✔
587
    close(c)
187✔
588

589
    # Capture all waitable objects scheduled after the end of `@sync` and
590
    # include them in the exception. This way, the user can check what was
591
    # scheduled by examining at the exception object.
592
    if isready(c)
187✔
593
        local racy
594
        for r in c
×
595
            if !@isdefined(racy)
×
596
                racy = []
×
597
            end
598
            push!(racy, r)
×
599
        end
×
600
        if @isdefined(racy)
×
601
            if !@isdefined(c_ex)
×
602
                c_ex = CompositeException()
×
603
            end
604
            # Since this is a clear programming error, show this exception first:
605
            pushfirst!(c_ex, ScheduledAfterSyncException(racy))
×
606
        end
607
    end
608

609
    if @isdefined(c_ex)
187✔
610
        throw(c_ex)
×
611
    end
612
    nothing
613
end
614

615
const sync_varname = gensym(:sync)
616

617
"""
618
    @sync
619

620
Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn),
621
`Distributed.@spawnat` and `Distributed.@distributed`
622
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
623
a [`CompositeException`](@ref).
624

625
# Examples
626
```julia-repl
627
julia> Threads.nthreads()
628
4
629

630
julia> @sync begin
631
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1")
632
           Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2")
633
       end;
634
Thread-id 3, task 1
635
Thread-id 1, task 2
636
```
637
"""
638
macro sync(block)
14✔
639
    var = esc(sync_varname)
14✔
640
    quote
14✔
641
        let $var = Channel(Inf)
161✔
642
            v = $(esc(block))
42,207✔
643
            sync_end($var)
161✔
644
            v
101✔
645
        end
646
    end
647
end
648

649
# schedule an expression to run asynchronously
650

651
"""
652
    @async
653

654
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
655

656
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
657
constructed underlying closure. This allows you to insert the _value_ of a variable,
658
isolating the asynchronous code from changes to the variable's value in the current task.
659

660
!!! warning
661
    It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no
662
    parallelism is required** especially in publicly distributed libraries.  This is
663
    because a use of `@async` disables the migration of the *parent* task across worker
664
    threads in the current implementation of Julia.  Thus, seemingly innocent use of
665
    `@async` in a library function can have a large impact on the performance of very
666
    different parts of user applications.
667

668
!!! compat "Julia 1.4"
669
    Interpolating values via `\$` is available as of Julia 1.4.
670
"""
671
macro async(expr)
168✔
672
    do_async_macro(expr, __source__)
168✔
673
end
674

675
# generate the code for @async, possibly wrapping the task in something before
676
# pushing it to the wait queue.
677
function do_async_macro(expr, linenums; wrap=identity)
326✔
678
    letargs = Base._lift_one_interp!(expr)
163✔
679

680
    thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums)
163✔
681
    var = esc(sync_varname)
163✔
682
    quote
163✔
683
        let $(letargs...)
9✔
684
            local task = Task($thunk)
88,478✔
685
            if $(Expr(:islocal, var))
87,434✔
686
                put!($var, $(wrap(:task)))
81,441✔
687
            end
688
            schedule(task)
88,478✔
689
            task
87,458✔
690
        end
691
    end
692
end
693

694
# task wrapper that doesn't create exceptions wrapped in TaskFailedException
695
struct UnwrapTaskFailedException <: Exception
696
    task::Task
24✔
697
end
698

699
# common code for wait&fetch for UnwrapTaskFailedException
700
function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException)
24✔
701
    try
24✔
702
        f(t.task)
24✔
703
    catch ex
704
        if ex isa TaskFailedException
×
705
            throw(ex.task.exception)
×
706
        else
707
            rethrow()
×
708
        end
709
    end
710
end
711

712
# the unwrapping for above task wrapper (gets triggered in sync_end())
713
wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t)
24✔
714

715
# same for fetching the tasks, for convenience
716
fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t)
×
717

718
# macro for running async code that doesn't throw wrapped exceptions
719
macro async_unwrap(expr)
720
    do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task)))
×
721
end
722

723
"""
724
    errormonitor(t::Task)
725

726
Print an error log to `stderr` if task `t` fails.
727

728
# Examples
729
```julia-repl
730
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
731
Unhandled Task ERROR: task failed
732
Stacktrace:
733
[...]
734
```
735
"""
736
function errormonitor(t::Task)
2✔
737
    t2 = Task() do
2,883✔
738
        if istaskfailed(t)
2,379✔
739
            local errs = stderr
×
740
            try # try to display the failure atomically
×
741
                errio = IOContext(PipeBuffer(), errs::IO)
×
742
                emphasize(errio, "Unhandled Task ")
×
743
                display_error(errio, scrub_repl_backtrace(current_exceptions(t)))
×
744
                write(errs, errio)
×
745
            catch
746
                try # try to display the secondary error atomically
×
747
                    errio = IOContext(PipeBuffer(), errs::IO)
×
748
                    print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ")
×
749
                    display_error(errio, scrub_repl_backtrace(current_exceptions()))
×
750
                    write(errs, errio)
×
751
                    flush(errs)
×
752
                    # and then the actual error, as best we can
753
                    Core.print(Core.stderr, "while handling: ")
×
754
                    Core.println(Core.stderr, current_exceptions(t)[end][1])
×
755
                catch e
756
                    # give up
757
                    Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name,
×
758
                            " while trying to print a failed Task notice; giving up\n")
759
                end
760
            end
761
        end
762
        nothing
763
    end
764
    t2.sticky = false
2,883✔
765
    _wait2(t, t2)
2,883✔
766
    return t
2✔
767
end
768

769
# Capture interpolated variables in $() and move them to let-block
770
function _lift_one_interp!(e)
12✔
771
    letargs = Any[]  # store the new gensymed arguments
175✔
772
    _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
175✔
773
    letargs
5✔
774
end
775
_lift_one_interp_helper(v, _, _) = v
5✔
776
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
783✔
777
    if expr.head === :$
783✔
778
        if in_quote_context  # This $ is simply interpolating out of the quote
×
779
            # Now, we're out of the quote, so any _further_ $ is ours.
780
            in_quote_context = false
×
781
        else
782
            newarg = gensym()
×
783
            push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
×
784
            return newarg  # Don't recurse into the lifted $() exprs
×
785
        end
786
    elseif expr.head === :quote
783✔
787
        in_quote_context = true   # Don't try to lift $ directly out of quotes
×
788
    elseif expr.head === :macrocall
783✔
789
        return expr  # Don't recur into macro calls, since some other macros use $
49✔
790
    end
791
    for (i,e) in enumerate(expr.args)
734✔
792
        expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
1,893✔
793
    end
1,893✔
794
    expr
×
795
end
796

797

798
# add a wait-able object to the sync pool
799
macro sync_add(expr)
800
    var = esc(sync_varname)
801
    quote
802
        local ref = $(esc(expr))
803
        put!($var, ref)
804
        ref
805
    end
806
end
807

808
# runtime system hook called when a task finishes
809
function task_done_hook(t::Task)
161,801✔
810
    # `finish_task` sets `sigatomic` before entering this function
811
    err = istaskfailed(t)
161,801✔
812
    result = task_result(t)
161,801✔
813
    handled = false
×
814

815
    donenotify = t.donenotify
161,801✔
816
    if isa(donenotify, ThreadSynchronizer)
161,801✔
817
        lock(donenotify)
161,732✔
818
        try
161,732✔
819
            if !isempty(donenotify.waitq)
161,732✔
820
                handled = true
6,301✔
821
                notify(donenotify)
6,301✔
822
            end
823
        finally
824
            unlock(donenotify)
161,732✔
825
        end
826
    end
827

828
    if err && !handled && Threads.threadid() == 1
161,801✔
829
        if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
4✔
830
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
831
            active_repl_backend.in_eval
832
            throwto(active_repl_backend.backend_task, result) # this terminates the task
×
833
        end
834
    end
835
    # Clear sigatomic before waiting
836
    sigatomic_end()
161,801✔
837
    try
161,801✔
838
        wait() # this will not return
161,801✔
839
    catch e
840
        # If an InterruptException happens while blocked in the event loop, try handing
841
        # the exception to the REPL task since the current task is done.
842
        # issue #19467
843
        if Threads.threadid() == 1 &&
×
844
            isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
845
            active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
846
            active_repl_backend.in_eval
847
            throwto(active_repl_backend.backend_task, e)
×
848
        else
849
            rethrow()
×
850
        end
851
    end
852
end
853

854

855
## scheduler and work queue
856

857
mutable struct IntrusiveLinkedListSynchronized{T}
858
    queue::IntrusiveLinkedList{T}
859
    lock::Threads.SpinLock
860
    IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock())
69✔
861
end
862
isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue)
1,781,635✔
863
length(W::IntrusiveLinkedListSynchronized) = length(W.queue)
1✔
864
function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
1,370,686✔
865
    lock(W.lock)
1,370,686✔
866
    try
1,370,686✔
867
        push!(W.queue, t)
1,847,532✔
868
    finally
869
        unlock(W.lock)
1,370,686✔
870
    end
871
    return W
1,370,686✔
872
end
873
function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
874
    lock(W.lock)
5✔
875
    try
5✔
876
        pushfirst!(W.queue, t)
10✔
877
    finally
878
        unlock(W.lock)
5✔
879
    end
880
    return W
5✔
881
end
882
function pop!(W::IntrusiveLinkedListSynchronized)
×
883
    lock(W.lock)
×
884
    try
×
885
        return pop!(W.queue)
×
886
    finally
887
        unlock(W.lock)
×
888
    end
889
end
890
function popfirst!(W::IntrusiveLinkedListSynchronized)
1,370,573✔
891
    lock(W.lock)
1,370,573✔
892
    try
1,370,573✔
893
        return popfirst!(W.queue)
1,847,416✔
894
    finally
895
        unlock(W.lock)
1,370,573✔
896
    end
897
end
898
function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
5✔
899
    lock(W.lock)
5✔
900
    try
5✔
901
        list_deletefirst!(W.queue, t)
5✔
902
    finally
903
        unlock(W.lock)
5✔
904
    end
905
    return W
5✔
906
end
907

908
const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
909
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
910
const Workqueues_lock = Threads.SpinLock()
911
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
912

913
function workqueue_for(tid::Int)
2,741,510✔
914
    qs = Workqueues
2,741,510✔
915
    if length(qs) >= tid && isassigned(qs, tid)
2,741,510✔
916
        return @inbounds qs[tid]
2,741,441✔
917
    end
918
    # slow path to allocate it
919
    @assert tid > 0
69✔
920
    l = Workqueues_lock
×
921
    @lock l begin
69✔
922
        qs = Workqueues
69✔
923
        if length(qs) < tid
69✔
924
            nt = Threads.maxthreadid()
69✔
925
            @assert tid <= nt
69✔
926
            global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
138✔
927
        end
928
        if !isassigned(qs, tid)
69✔
929
            @inbounds qs[tid] = StickyWorkqueue()
69✔
930
        end
931
        return @inbounds qs[tid]
69✔
932
    end
933
end
934

935
function enq_work(t::Task)
1,370,906✔
936
    (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
1,370,906✔
937

938
    # Sticky tasks go into their thread's work queue.
939
    if t.sticky
1,370,906✔
940
        tid = Threads.threadid(t)
1,364,765✔
941
        if tid == 0
1,364,765✔
942
            # The task is not yet stuck to a thread. Stick it to the current
943
            # thread and do the same to the parent task (the current task) so
944
            # that the tasks are correctly co-scheduled (issue #41324).
945
            # XXX: Ideally we would be able to unset this.
946
            if GC.in_finalizer()
154,946✔
947
                # The task was launched in a finalizer. There is no thread to sticky it
948
                # to, so just allow it to run anywhere as if it had been non-sticky.
949
                t.sticky = false
18✔
950
                @goto not_sticky
18✔
951
            else
952
                tid = Threads.threadid()
154,928✔
953
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
154,928✔
954
                current_task().sticky = true
154,928✔
955
            end
956
        end
957
        push!(workqueue_for(tid), t)
1,364,747✔
958
    else
959
        @label not_sticky
960
        tp = Threads.threadpool(t)
12,312✔
961
        if tp === :foreign || Threads.threadpoolsize(tp) == 1
18,471✔
962
            # There's only one thread in the task's assigned thread pool;
963
            # use its work queue.
964
            tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1
11,872✔
965
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
5,939✔
966
            push!(workqueue_for(tid), t)
5,939✔
967
        else
968
            # Otherwise, put the task in the multiqueue.
969
            Partr.multiq_insert(t, t.priority)
220✔
970
            tid = 0
×
971
        end
972
    end
973
    ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
1,370,906✔
974
    return t
1,370,906✔
975
end
976

977
schedule(t::Task) = enq_work(t)
158,258✔
978

979
"""
980
    schedule(t::Task, [val]; error=false)
981

982
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
983
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
984

985
If a second argument `val` is provided, it will be passed to the task (via the return value of
986
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
987
the woken task.
988

989
!!! warning
990
    It is incorrect to use `schedule` on an arbitrary `Task` that has already been started.
991
    See [the API reference](@ref low-level-schedule-wait) for more information.
992

993
# Examples
994
```jldoctest
995
julia> a5() = sum(i for i in 1:1000);
996

997
julia> b = Task(a5);
998

999
julia> istaskstarted(b)
1000
false
1001

1002
julia> schedule(b);
1003

1004
julia> yield();
1005

1006
julia> istaskstarted(b)
1007
true
1008

1009
julia> istaskdone(b)
1010
true
1011
```
1012
"""
1013
function schedule(t::Task, @nospecialize(arg); error=false)
885,181✔
1014
    # schedule a task to be (re)started with the given value or exception
1015
    t._state === task_state_runnable || Base.error("schedule: Task not runnable")
442,726✔
1016
    if error
442,724✔
1017
        t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t)
1,384✔
1018
        setfield!(t, :result, arg)
1,382✔
1019
        setfield!(t, :_isexception, true)
1,382✔
1020
    else
1021
        t.queue === nothing || Base.error("schedule: Task not runnable")
441,342✔
1022
        setfield!(t, :result, arg)
441,342✔
1023
    end
1024
    enq_work(t)
442,724✔
1025
    return t
442,724✔
1026
end
1027

1028
"""
1029
    yield()
1030

1031
Switch to the scheduler to allow another scheduled task to run. A task that calls this
1032
function is still runnable, and will be restarted immediately if there are no other runnable
1033
tasks.
1034
"""
1035
function yield()
770,418✔
1036
    ct = current_task()
770,418✔
1037
    enq_work(ct)
770,418✔
1038
    try
770,418✔
1039
        wait()
770,418✔
1040
    catch
1041
        ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
5✔
1042
        rethrow()
5✔
1043
    end
1044
end
1045

1046
@inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t)
1,371,038✔
1047

1048
"""
1049
    yield(t::Task, arg = nothing)
1050

1051
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
1052
immediately yields to `t` before calling the scheduler.
1053

1054
Throws a `ConcurrencyViolationError` if `t` is the currently running task.
1055
"""
1056
function yield(t::Task, @nospecialize(x=nothing))
253✔
1057
    current = current_task()
482✔
1058
    t === current && throw(ConcurrencyViolationError("Cannot yield to currently running task!"))
241✔
1059
    (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable"))
241✔
1060
    t.result = x
241✔
1061
    enq_work(current)
241✔
1062
    set_next_task(t)
241✔
1063
    return try_yieldto(ensure_rescheduled)
241✔
1064
end
1065

1066
"""
1067
    yieldto(t::Task, arg = nothing)
1068

1069
Switch to the given task. The first time a task is switched to, the task's function is
1070
called with no arguments. On subsequent switches, `arg` is returned from the task's last
1071
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
1072
or scheduling in any way. Its use is discouraged.
1073
"""
1074
function yieldto(t::Task, @nospecialize(x=nothing))
4✔
1075
    # TODO: these are legacy behaviors; these should perhaps be a scheduler
1076
    # state error instead.
1077
    if t._state === task_state_done
9✔
1078
        return x
×
1079
    elseif t._state === task_state_failed
5✔
1080
        throw(t.result)
×
1081
    end
1082
    t.result = x
5✔
1083
    set_next_task(t)
5✔
1084
    return try_yieldto(identity)
5✔
1085
end
1086

1087
function try_yieldto(undo)
1,371,038✔
1088
    try
1,371,038✔
1089
        ccall(:jl_switch, Cvoid, ())
1,371,038✔
1090
    catch
1091
        undo(ccall(:jl_get_next_task, Ref{Task}, ()))
6✔
1092
        rethrow()
6✔
1093
    end
1094
    ct = current_task()
1,209,017✔
1095
    if ct._isexception
1,209,017✔
1096
        exc = ct.result
1,384✔
1097
        ct.result = nothing
1,384✔
1098
        ct._isexception = false
1,384✔
1099
        throw(exc)
1,384✔
1100
    end
1101
    result = ct.result
1,207,633✔
1102
    ct.result = nothing
1,207,633✔
1103
    return result
1,207,633✔
1104
end
1105

1106
# yield to a task, throwing an exception in it
1107
function throwto(t::Task, @nospecialize exc)
1108
    t.result = exc
4✔
1109
    t._isexception = true
4✔
1110
    set_next_task(t)
4✔
1111
    return try_yieldto(identity)
4✔
1112
end
1113

1114
function ensure_rescheduled(othertask::Task)
5✔
1115
    ct = current_task()
5✔
1116
    W = workqueue_for(Threads.threadid())
5✔
1117
    if ct !== othertask && othertask._state === task_state_runnable
5✔
1118
        # we failed to yield to othertask
1119
        # return it to the head of a queue to be retried later
1120
        tid = Threads.threadid(othertask)
5✔
1121
        Wother = tid == 0 ? W : workqueue_for(tid)
10✔
1122
        pushfirst!(Wother, othertask)
5✔
1123
    end
1124
    # if the current task was queued,
1125
    # also need to return it to the runnable state
1126
    # before throwing an error
1127
    list_deletefirst!(W, ct)
5✔
1128
    nothing
1129
end
1130

1131
function trypoptask(W::StickyWorkqueue)
1,781,633✔
1132
    while !isempty(W)
1,781,634✔
1133
        t = popfirst!(W)
1,370,573✔
1134
        if t._state !== task_state_runnable
1,370,573✔
1135
            # assume this somehow got queued twice,
1136
            # probably broken now, but try discarding this switch and keep going
1137
            # can't throw here, because it's probably not the fault of the caller to wait
1138
            # and don't want to use print() here, because that may try to incur a task switch
1139
            ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
1✔
1140
                "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n")
1141
            continue
×
1142
        end
1143
        return t
1,370,572✔
1144
    end
1✔
1145
    return Partr.multiq_deletemin()
411,061✔
1146
end
1147

1148
checktaskempty = Partr.multiq_check_empty
1149

1150
@noinline function poptask(W::StickyWorkqueue)
1,370,814✔
1151
    task = trypoptask(W)
1,370,814✔
1152
    if !(task isa Task)
1,370,814✔
1153
        task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
114,977✔
1154
    end
1155
    set_next_task(task)
1,370,788✔
1156
    nothing
1157
end
1158

1159
function wait()
1,370,814✔
1160
    GC.safepoint()
1,370,814✔
1161
    W = workqueue_for(Threads.threadid())
1,370,814✔
1162
    poptask(W)
1,370,814✔
1163
    result = try_yieldto(ensure_rescheduled)
1,370,788✔
1164
    process_events()
1,207,389✔
1165
    # return when we come out of the queue
1166
    return result
1,207,389✔
1167
end
1168

1169
if Sys.iswindows()
1170
    pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
×
1171
else
1172
    pause() = ccall(:pause, Cvoid, ())
×
1173
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc