• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / 1649

26 Mar 2026 12:39AM UTC coverage: 77.623% (+0.8%) from 76.798%
1649

push

buildkite

web-flow
Make `SourceRef` contain `Ref{SourceFile}` instead of `SourceFile` (#61402)

64568 of 83181 relevant lines covered (77.62%)

23641174.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.96
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
public Condition, threadpoolsize, ngcthreads
7

8
"""
9
    Threads.threadid([t::Task])::Int
10

11
Get the ID number of the current thread of execution, or the thread of task
12
`t`. The master thread has ID `1`.
13

14
# Examples
15
```julia-repl
16
julia> Threads.threadid()
17
1
18

19
julia> Threads.@threads for i in 1:4
20
          println(Threads.threadid())
21
       end
22
4
23
2
24
5
25
4
26

27
julia> Threads.threadid(Threads.@spawn "foo")
28
2
29
```
30

31
!!! note
32
    The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
33
    For this reason in most cases it is not safe to use `threadid([task])` to index into, say, a vector of buffers or stateful
34
    objects.
35
"""
36
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
279,178,627✔
37

38
# lower bound on the largest threadid()
39
"""
40
    Threads.maxthreadid()::Int
41

42
Get a lower bound on the number of threads (across all thread pools) available
43
to the Julia process, with atomic-acquire semantics. The result will always be
44
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
45
any task you were able to observe before calling `maxthreadid`.
46
"""
47
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
2,177✔
48

49
"""
50
    Threads.nthreads(:default | :interactive)::Int
51

52
Get the current number of threads within the specified thread pool. The threads in `:interactive`
53
have id numbers `1:nthreads(:interactive)`, and the threads in `:default` have id numbers in
54
`nthreads(:interactive) .+ (1:nthreads(:default))`.
55

56
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
57
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
58
standard library and [`Threads.maxthreadid()`](@ref).
59
"""
60
nthreads(pool::Symbol) = threadpoolsize(pool)
1,560✔
61

62
function _nthreads_in_pool(tpid::Int8)
16✔
63
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
212,430,642✔
64
    return Int(unsafe_load(p, tpid + 1))
212,430,642✔
65
end
66

67
function _tpid_to_sym(tpid::Int8)
8✔
68
    if tpid == 0
92,016,221✔
69
        return :interactive
4,907✔
70
    elseif tpid == 1
92,011,314✔
71
        return :default
92,011,314✔
72
    elseif tpid == -1
×
73
        return :foreign
×
74
    else
75
        throw(ArgumentError(LazyString("Unrecognized threadpool id ", tpid)))
×
76
    end
77
end
78

79
function _sym_to_tpid(tp::Symbol)
16✔
80
    if tp === :interactive
162,315,741✔
81
        return Int8(0)
40,132,447✔
82
    elseif tp === :default
122,183,294✔
83
        return Int8(1)
122,183,289✔
84
    elseif tp == :foreign
5✔
85
        return Int8(-1)
×
86
    else
87
        throw(ArgumentError(LazyString("Unrecognized threadpool name `", tp, "`")))
5✔
88
    end
89
end
90

91
"""
92
    Threads.threadpool(tid = threadid())::Symbol
93

94
Return the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
95
"""
96
function threadpool(tid = threadid())
23✔
97
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
326✔
98
    return _tpid_to_sym(tpid)
196✔
99
end
100

101
"""
102
    Threads.threadpooldescription(tid = threadid())::String
103

104
Return the specified thread's threadpool name with extended description where appropriate.
105
"""
106
function threadpooldescription(tid = threadid())
4✔
107
    threadpool_name = threadpool(tid)
8✔
108
    if threadpool_name == :foreign
4✔
109
        # TODO: extend tls to include a field to add a description to a foreign thread and make this more general
110
        n_others = nthreads(:interactive) + nthreads(:default)
×
111
        # Assumes GC threads come first in the foreign thread pool
112
        if tid > n_others && tid <= n_others + ngcthreads()
×
113
            return "foreign: gc"
×
114
        end
115
    end
116
    return string(threadpool_name)
4✔
117
end
118

119
"""
120
    Threads.nthreadpools()::Int
121

122
Return the number of threadpools currently configured.
123
"""
124
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
2✔
125

126
"""
127
    Threads.threadpoolsize(pool::Symbol = :default)::Int
128

129
Get the number of threads available to the default thread pool (or to the
130
specified thread pool).
131

132
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
133
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
134
[`Distributed`](@ref man-distributed) standard library.
135
"""
136
function threadpoolsize(pool::Symbol = :default)
1,432✔
137
    if pool === :default || pool === :interactive
142,517,135✔
138
        tpid = _sym_to_tpid(pool)
229,340,354✔
139
    elseif pool == :foreign
×
140
        error("Threadpool size of `:foreign` is indeterminant")
×
141
    else
142
        error("invalid threadpool specified")
×
143
    end
144
    return _nthreads_in_pool(tpid)
137,328,634✔
145
end
146

147
"""
148
    threadpooltids(pool::Symbol)
149

150
Return a vector of IDs of threads in the given pool.
151
"""
152
function threadpooltids(pool::Symbol)
4✔
153
    ni = _nthreads_in_pool(Int8(0))
4✔
154
    if pool === :interactive
4✔
155
        return collect(1:ni)
2✔
156
    elseif pool === :default
2✔
157
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
2✔
158
    else
159
        error("invalid threadpool specified")
×
160
    end
161
end
162

163
"""
164
    Threads.ngcthreads()::Int
165

166
Return the number of GC threads currently configured.
167
This includes both mark threads and concurrent sweep threads.
168
"""
169
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
25✔
170

171
function threading_run(fun, static)
1,653,597✔
172
    ccall(:jl_enter_threaded_region, Cvoid, ())
1,653,597✔
173
    n = threadpoolsize()
1,653,597✔
174
    tid_offset = threadpoolsize(:interactive)
1,653,597✔
175
    tasks = Vector{Task}(undef, n)
1,653,597✔
176
    try
1,653,597✔
177
        for i = 1:n
3,307,188✔
178
            t = Task(() -> fun(i)) # pass in tid
7,055,648✔
179
            t.sticky = static
3,527,824✔
180
            if static
3,527,824✔
181
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
498✔
182
            else
183
                # TODO: this should be the current pool (except interactive) if there
184
                # are ever more than two pools.
185
                _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default))
3,527,326✔
186
                @assert _result == 1 "_result != 1"
3,527,326✔
187
            end
188
            tasks[i] = t
3,527,824✔
189
            schedule(t)
3,527,824✔
190
        end
5,402,051✔
191
        for i = 1:n
3,307,188✔
192
            Base._wait(tasks[i])
3,527,824✔
193
        end
5,402,051✔
194
    finally
195
        ccall(:jl_exit_threaded_region, Cvoid, ())
1,653,597✔
196
    end
197
    failed_tasks = filter!(istaskfailed, tasks)
1,653,603✔
198
    if !isempty(failed_tasks)
1,653,597✔
199
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
60✔
200
    end
201
end
202

203
# Helper to generate threading run code with schedule checking
204
function _threading_run_expr(schedule)
1,587✔
205
    quote
1,587✔
206
        if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
1,653,612✔
207
            threading_run(threadsfor_fun, false)
1,653,363✔
208
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
249✔
209
            error("`@threads :static` cannot be used concurrently or nested")
15✔
210
        else # :static
211
            threading_run(threadsfor_fun, true)
234✔
212
        end
213
    end
214
end
215

216
function _threadsfor(iter, lbody, schedule)
717✔
217
    lidx = iter.args[1]         # index
717✔
218
    range = iter.args[2]
717✔
219
    esc_range = esc(range)
717✔
220
    func = if schedule === :greedy
717✔
221
        greedy_func(esc_range, lidx, lbody)
105✔
222
    else
223
        default_func(esc_range, lidx, lbody)
1,329✔
224
    end
225
    quote
717✔
226
        local threadsfor_fun
227
        $func
228
        $(_threading_run_expr(schedule))
229
        nothing
1,652,569✔
230
    end
231
end
232

233
function _threadsfor_multi_iterator(body, iterators, condition, schedule, dims, result_type)
120✔
234
    vars = [iter.args[1] for iter in iterators]
120✔
235
    ranges = [iter.args[2] for iter in iterators]
120✔
236

237
    tuple_var = gensym("iter_tuple")
120✔
238
    assignments = [:($(vars[i]) = $(tuple_var)[$i]) for i in 1:length(vars)]
120✔
239
    # Use let blocks so destructured variables are local to each iteration,
240
    # avoiding data races when multiple threads execute the body concurrently.
241
    new_body = Expr(:let, Expr(:block, assignments...), body)
120✔
242
    new_condition = if condition === true
120✔
243
        true
90✔
244
    else
245
        Expr(:let, Expr(:block, assignments...), condition)
120✔
246
    end
247

248
    product_expr = :(Iterators.product($(ranges...)))
120✔
249
    synthetic_iter = :($(tuple_var) = $(product_expr))
120✔
250

251
    return _threadsfor_single_iterator(new_body, synthetic_iter, new_condition, schedule, dims; result_type)
120✔
252
end
253

254
function _threadsfor_comprehension(gen::Expr, schedule, result_type=nothing)
1,515✔
255
    @assert gen.head === :generator
1,515✔
256

257
    body = gen.args[1]
870✔
258

259
    # Check if the second arg is a filter (handles both single and multi-loop with filters)
260
    iter_or_filter = gen.args[2]
870✔
261
    if isa(iter_or_filter, Expr) && iter_or_filter.head === :filter
870✔
262
        condition = iter_or_filter.args[1]
300✔
263
        iterators = iter_or_filter.args[2:end]
600✔
264

265
        if length(iterators) == 1
300✔
266
            return _threadsfor_single_iterator(body, iterators[1], condition, schedule; result_type)
270✔
267
        else
268
            return _threadsfor_multi_iterator(body, iterators, condition, schedule, nothing, result_type)
30✔
269
        end
270
    elseif length(gen.args) > 2
570✔
271
        iterators = gen.args[2:end]
180✔
272
        ranges = [iter.args[2] for iter in iterators]
90✔
273
        # Use axes to preserve offset index spaces (e.g. OffsetArrays)
274
        dims_expr = :(tuple($([:(axes($(esc(r)), 1)) for r in ranges]...)))
90✔
275
        return _threadsfor_multi_iterator(body, iterators, true, schedule, dims_expr, result_type)
90✔
276
    else
277
        return _threadsfor_single_iterator(body, iter_or_filter, true, schedule; result_type)
480✔
278
    end
279
end
280

281
function _threadsfor_single_iterator(body, iterator, condition, schedule, dims=nothing; result_type=nothing)
2,490✔
282
    lidx = iterator.args[1]
870✔
283
    range = iterator.args[2]
870✔
284
    esc_range = esc(range)
870✔
285
    esc_lidx = esc(lidx)
870✔
286
    esc_body = esc(body)
870✔
287
    esc_condition = condition === true ? true : esc(condition)
870✔
288

289
    # Fast path: no filter and not greedy — pre-allocate and write directly
290
    if condition === true && schedule !== :greedy
870✔
291
        return _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
450✔
292
    end
293

294
    func = if schedule === :greedy
420✔
295
        greedy_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition)
225✔
296
    else
297
        default_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition, result_type)
615✔
298
    end
299

300
    result_expr = if schedule === :greedy
420✔
301
        # Greedy: collect values in arrival order (no ordering guarantee)
302
        if result_type !== nothing
225✔
303
            esc_result_type = esc(result_type)
45✔
304
            quote
45✔
305
                close(result_channel)
60✔
306
                vals = collect(result_channel)
60✔
307
                isempty(vals) ? $esc_result_type[] : $esc_result_type[v for v in vals]
90✔
308
            end
309
        else
310
            quote
180✔
311
                close(result_channel)
195✔
312
                collect(result_channel)
195✔
313
            end
314
        end
315
    else
316
        # Default/static: thread-local buffers, vcat in tid order preserves iteration order.
317
        # For the untyped case, we try vcat first (fast bulk copies). If the buffers have
318
        # a Union or Any element type — indicating a type-unstable body — we fall back to
319
        # grow_to! which replicates serial's promote_typejoin widening.
320
        if result_type !== nothing
195✔
321
            esc_result_type = esc(result_type)
30✔
322
            quote
30✔
323
                vcat(result_channel...)::Vector{$esc_result_type}
45✔
324
            end
325
        else
326
            quote
585✔
327
                let _bufs = result_channel
180✔
328
                    _ET = eltype(eltype(_bufs))
180✔
329
                    if isconcretetype(_ET)
180✔
330
                        # All buffers have a concrete element type (from promote_op inference
331
                        # or a uniform body). Use bulk vcat — same result as grow_to! here.
332
                        vcat(_bufs...)
150✔
333
                    else
334
                        # Type-unstable body: grow_to! discovers the correct widened type,
335
                        # matching the return type of the equivalent serial comprehension.
336
                        Base.grow_to!(Any[], Iterators.flatten(_bufs))
180✔
337
                    end
338
                end
339
            end
340
        end
341
    end
342

343
    # If dims is provided, reshape the result to match original comprehension dimensions
344
    if dims !== nothing
420✔
345
        result_expr = quote
15✔
346
            let flat_result = $result_expr
347
                reshape(flat_result, $(dims))
15✔
348
            end
349
        end
350
    end
351

352
    quote
420✔
353
        local threadsfor_fun
354
        local result_channel = $func
355
        $(_threading_run_expr(schedule))
356
        $result_expr
357
    end
358
end
359

360
# Fast path for non-filtered, non-greedy comprehensions: pre-allocate and write directly.
361
# Non-AbstractArray iterators (e.g. Iterators.flatten) are collected into a Vector
362
# because the parallel work distribution indexes into items with r[i].
363
# AbstractArrays and Tuples are used directly to preserve their index space (e.g. OffsetArrays).
364
function _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
450✔
365
    work_dist = _work_distribution_code()
450✔
366
    wrap_final = dims !== nothing ? (x -> :(reshape($x, $dims))) : identity
585✔
367

368
    if result_type !== nothing
450✔
369
        # Typed path: pre-allocate with known element type
370
        esc_result_type = esc(result_type)
150✔
371
        return quote
150✔
372
            let iter = $esc_range
195✔
373
            local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
210✔
374
            local niter = length(items)
195✔
375
            local result = similar(Vector{$esc_result_type}, axes(items))
195✔
376
            if niter > 0
195✔
377
                let items = items, result = result
180✔
378
                local threadsfor_fun
379
                function threadsfor_fun(tid = 1; onethread = false)
916✔
380
                    # Reads: items, tid, onethread. Defines: r, loop_first, loop_last.
381
                    $work_dist
384✔
382
                    for i = loop_first:loop_last
564✔
383
                        local $esc_lidx = @inbounds r[i]
3,495✔
384
                        @inbounds result[i] = $esc_body
3,495✔
385
                    end
3,495✔
386
                end
387
                $(_threading_run_expr(schedule))
388
                end
389
            end
390
            $(wrap_final(:(result)))
195✔
391
            end
392
        end
393
    else
394
        # Untyped path: evaluate first element to determine result type,
395
        # then fill in parallel with the body expression inlined directly
396
        # in the closure. This avoids boxing that occurs when calling a
397
        # lambda whose return type is a Union across closure boundaries.
398
        return quote
300✔
399
            let iter = $esc_range
315✔
400
            local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
405✔
401
            local niter = length(items)
315✔
402
            if niter == 0
315✔
403
                $(wrap_final(:(similar(Vector{Any}, axes(items)))))
15✔
404
            else
405
                local _skip = firstindex(items)
300✔
406
                local $esc_lidx = @inbounds items[_skip]
300✔
407
                local _probe_val = $esc_body
315✔
408
                local result = similar(Vector{typeof(_probe_val)}, axes(items))
300✔
409
                @inbounds result[_skip] = _probe_val
300✔
410
                if niter > 1
300✔
411
                    local _npool = threadpoolsize()
285✔
412
                    local _widen_buffers = [Pair{Int,Any}[] for _ in 1:_npool]
315✔
413
                    let items = items, result = result, _widen_buffers = _widen_buffers,
285✔
414
                        _skip = _skip
415
                    local threadsfor_fun
416
                    function threadsfor_fun(tid = 1; onethread = false)
1,501✔
417
                        # Reads: items, tid, onethread. Defines: r, loop_first, loop_last.
418
                        $work_dist
608✔
419
                        local _T = eltype(result)
600✔
420
                        local _my_widen = _widen_buffers[tid]
600✔
421
                        for i = loop_first:loop_last
1,200✔
422
                            i == _skip && continue
3,185,505✔
423
                            local $esc_lidx = @inbounds r[i]
3,185,220✔
424
                            local _val = $esc_body
3,185,220✔
425
                            if _val isa _T
3,185,220✔
426
                                @inbounds result[i] = _val
3,185,115✔
427
                            else
428
                                push!(_my_widen, i => _val)
105✔
429
                            end
430
                        end
3,185,505✔
431
                    end
432
                    $(_threading_run_expr(schedule))
433
                    end
434
                    result = Base.setindices_widen_up_to(result, _widen_buffers)
285✔
435
                end
436
                $(wrap_final(:(result)))
585✔
437
            end
438
            end
439
        end
440
    end
441
end
442

443

444
function greedy_func(itr, lidx, lbody)
105✔
445
    quote
105✔
446
        let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
201✔
447
            for item in $itr
402✔
448
                put!(ch, item)
627✔
449
            end
627✔
450
        end
451
        function threadsfor_fun(tid)
663✔
452
            for item in c
494✔
453
                local $(esc(lidx)) = item
627✔
454
                $(esc(lbody))
627✔
455
            end
595✔
456
        end
457
        end
458
    end
459
end
460

461
function greedy_comprehension_func(itr, esc_lidx, esc_body, esc_condition)
225✔
462
    quote
225✔
463
        let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
255✔
464
                for item in $itr
390✔
465
                    put!(ch, item)
30,020,895✔
466
                end
30,020,895✔
467
            end
468
            result_channel = Channel{Any}(Inf)
255✔
469

470
            function threadsfor_fun(tid)
799✔
471
                for item in c
544✔
472
                    local $esc_lidx = item
30,020,895✔
473
                    if $esc_condition
30,020,895✔
474
                        put!(result_channel, $esc_body)
15,019,725✔
475
                    end
476
                end
30,020,895✔
477
            end
478
            result_channel
255✔
479
        end
480
    end
481
end
482

483
# Helper function to generate work distribution code
484
function _work_distribution_code()
485
    quote
1,257✔
486
        r = items # Load into local variable
3,526,786✔
487
        lenr = length(r)
3,526,870✔
488
        # divide loop iterations among threads
489
        if onethread
3,526,786✔
490
            tid = 1
×
491
            len, rem = lenr, 0
×
492
        else
493
            len, rem = divrem(lenr, threadpoolsize())
3,526,786✔
494
        end
495
        # not enough iterations for all the threads?
496
        if len == 0
3,526,786✔
497
            if tid > rem
356✔
498
                return
209✔
499
            end
500
            len, rem = 1, 0
147✔
501
        end
502
        # compute this thread's iterations
503
        loop_first = firstindex(r) + ((tid-1) * len)
3,526,577✔
504
        loop_last = loop_first + len - 1
3,526,577✔
505
        # distribute remaining iterations evenly
506
        if rem > 0
3,526,577✔
507
            if tid <= rem
594✔
508
                loop_first = loop_first + (tid-1)
257✔
509
                loop_last = loop_last + tid
257✔
510
            else
511
                loop_first = loop_first + rem
337✔
512
                loop_last = loop_last + rem
337✔
513
            end
514
        end
515
    end
516
end
517

518
function default_func(itr, lidx, lbody)
612✔
519
    work_dist = _work_distribution_code()
612✔
520
    quote
612✔
521
        let items = $itr
1,652,502✔
522
        function threadsfor_fun(tid = 1; onethread = false)
8,703,062✔
523
            # Reads: items, tid, onethread. Defines: r, loop_first, loop_last.
524
            $work_dist
3,525,314✔
525
            for i = loop_first:loop_last
7,050,424✔
526
                local $(esc(lidx)) = @inbounds r[i]
1,687,788,112✔
527
                $(esc(lbody))
17,086,725✔
528
            end
1,687,788,033✔
529
        end
530
        end
531
    end
532
end
533

534
function default_comprehension_func(itr, esc_lidx, esc_body, esc_condition, result_type=nothing)
195✔
535
    work_dist = _work_distribution_code()
195✔
536
    if result_type !== nothing
195✔
537
        # Typed comprehension: element type known at macro expansion time.
538
        buf_init = :($(esc(result_type))[])
30✔
539
        buf_type_setup = :()
30✔
540
    else
541
        # Untyped comprehension: use promote_op to pre-type the per-task buffers,
542
        # avoiding boxing in the parallel phase for type-stable bodies.
543
        # The result is flattened through grow_to! so the final element type matches
544
        # serial's runtime promote_typejoin widening rather than the static promote_op type.
545
        _ET = gensym(:ET)
165✔
546
        buf_type_setup = :(local $_ET = Base.promote_op($esc_lidx -> $esc_body, eltype(items)))
165✔
547
        buf_init = :($_ET[])
165✔
548
    end
549
    quote
195✔
550
        let iter = $itr
225✔
551
        local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
270✔
552
        local _npool = threadpoolsize()
225✔
553
        $buf_type_setup
225✔
554
        # One buffer per task-id; tasks process contiguous ranges so concatenating
555
        # in tid order preserves iteration order without a sort step.
556
        local local_bufs = [$buf_init for _ in 1:_npool]
285✔
557

558
        function threadsfor_fun(tid = 1; onethread = false)
1,185✔
559
            # Reads: items, tid, onethread. Defines: r, loop_first, loop_last.
560
            $work_dist
480✔
561
            local buf = local_bufs[tid]
480✔
562
            for i = loop_first:loop_last
960✔
563
                local $esc_lidx = @inbounds r[i]
30,009,120✔
564
                if $esc_condition
30,009,120✔
565
                    push!(buf, $esc_body)
15,005,580✔
566
                end
567
            end
30,009,120✔
568
        end
569
        local_bufs  # Return per-task buffers to be vcat'd after threading_run
225✔
570
        end
571
    end
572
end
573

574
"""
575
    Threads.@threads [schedule] for ... end
576
    Threads.@threads [schedule] [expr for ... end]
577
    Threads.@threads [schedule] T[expr for ... end]
578

579
A macro to execute a `for` loop or array comprehension in parallel. The iteration space is distributed to
580
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
581
execution of the loop waits for the evaluation of all iterations.
582

583
For `for` loops, the macro executes the loop body in parallel but does not return a value.
584
For array comprehensions, the macro executes the comprehension in parallel and returns
585
the collected results as an array.
586

587
Tasks spawned by `@threads` are scheduled on the `:default` threadpool. This means that
588
`@threads` will not use threads from the `:interactive` threadpool, even if called from
589
the main thread or from a task in the interactive pool. The `:default` threadpool is
590
intended for compute-intensive parallel workloads.
591

592
See also: [`@spawn`](@ref Threads.@spawn) and
593
`pmap` in [`Distributed`](@ref man-distributed).
594
For more information on threadpools, see the chapter on [threadpools](@ref man-threadpools).
595

596
# Extended help
597

598
## Semantics
599

600
Unless stronger guarantees are specified by the scheduling option, the loop executed by
601
`@threads` macro have the following semantics.
602

603
The `@threads` macro executes the loop body in an unspecified order and potentially
604
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
605
The assignments can be different for each execution. The loop body code (including any code
606
transitively called from it) must not make any assumptions about the distribution of
607
iterations to tasks or the worker thread in which they are executed. The loop body for each
608
iteration must be able to make forward progress independent of other iterations and be free
609
from data races. As such, invalid synchronizations across iterations may deadlock while
610
unsynchronized memory accesses may result in undefined behavior.
611

612
For example, the above conditions imply that:
613

614
- A lock taken in an iteration *must* be released within the same iteration.
615
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
616
- Write only to locations not shared across iterations (unless a lock or atomic operation is
617
  used).
618
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
619
  may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
620

621
## Schedulers
622

623
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
624
releases. Currently, `:dynamic` is used when the scheduler is not specified.
625

626
!!! compat "Julia 1.5"
627
    The `schedule` argument is available as of Julia 1.5.
628

629
### `:dynamic` (default)
630

631
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
632
implementation assumes that the workload for each iteration is uniform. However, this
633
assumption may be removed in the future.
634

635
This scheduling option is merely a hint to the underlying execution mechanism. However, a
636
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
637
bounded by a small constant multiple of the number of available worker threads
638
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
639
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
640
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
641
larger than the number of the worker threads and the run-time of `f(x)` is relatively
642
smaller than the cost of spawning and synchronizing a task (typically less than 10
643
microseconds).
644

645
!!! compat "Julia 1.8"
646
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
647

648
### `:greedy`
649

650
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
651
the given iterated values as they are produced. As soon as one task finishes its work, it takes
652
the next value from the iterator. Work done by any individual task is not necessarily on
653
contiguous values from the iterator. The given iterator may produce values forever, only the
654
iterator interface is required (no indexing).
655

656
This scheduling option is generally a good choice if the workload of individual iterations
657
is not uniform/has a large spread.
658

659
!!! compat "Julia 1.11"
660
    The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
661

662
### `:static`
663

664
`:static` scheduler creates one task per thread and divides the iterations equally among
665
them, assigning each task specifically to each thread. In particular, the value of
666
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
667
Specifying `:static` is an error if used from inside another `@threads` loop or from a
668
thread other than 1.
669

670
!!! note
671
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
672
    In newly written library functions, `:static` scheduling is discouraged because the
673
    functions using this option cannot be called from arbitrary worker threads.
674

675
## Examples
676

677
### For loops
678

679
To illustrate of the different scheduling strategies, consider the following function
680
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
681

682
```julia-repl
683
julia> function busywait(seconds)
684
            tstart = time_ns()
685
            while (time_ns() - tstart) / 1e9 < seconds
686
            end
687
        end
688

689
julia> @time begin
690
            Threads.@spawn busywait(5)
691
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
692
                busywait(1)
693
            end
694
        end
695
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
696

697
julia> @time begin
698
            Threads.@spawn busywait(5)
699
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
700
                busywait(1)
701
            end
702
        end
703
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
704
```
705

706
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
707
to run two of the 1-second iterations to complete the for loop.
708

709
### Array comprehensions
710

711
The `@threads` macro also supports array comprehensions, which return the collected results.
712
Array comprehensions preserve element order for `:static` and `:dynamic` (default) scheduling.
713
The `:greedy` scheduler does not guarantee element order, since tasks consume work items as
714
they become available. Multi-dimensional comprehensions preserve the dimensions of the
715
original comprehension (e.g., `[f(i,j) for i in 1:n, j in 1:m]` returns an `n×m` matrix).
716
Typed comprehensions (`T[expr for ...]`) are also supported and return an array with the
717
specified element type.
718

719
For non-filtered comprehensions with non-`:greedy` scheduling, a fast path is used that
720
pre-allocates the result array and writes directly by index, avoiding Channel overhead.
721

722
!!! tip "Performance tip"
723
    For best performance, use typed comprehensions (`T[expr for ...]`) when the element type
724
    is known. Untyped comprehensions infer the type from the first result; if later results
725
    have incompatible types, a type-widening path is used which may be slower.
726

727
!!! warning
728
    The body expression of a threaded comprehension may execute on any thread and may
729
    migrate between threads. Do not rely on [`threadid()`](@ref Threads.threadid) or
730
    [`task_local_storage()`](@ref) returning consistent values within the body expression.
731

732
```julia-repl
733
julia> Threads.@threads [i^2 for i in 1:5] # Simple comprehension
734
5-element Vector{Int64}:
735
   1
736
   4
737
   9
738
  16
739
  25
740

741
julia> Threads.@threads [i^2 for i in 1:5 if iseven(i)] # Filtered comprehension
742
2-element Vector{Int64}:
743
   4
744
  16
745

746
julia> Threads.@threads [i + j for i in 1:3, j in 1:3] # Multiple loops
747
3×3 Matrix{Int64}:
748
 2  3  4
749
 3  4  5
750
 4  5  6
751

752
julia> Threads.@threads Float64[i^2 for i in 1:5] # Typed comprehension
753
5-element Vector{Float64}:
754
  1.0
755
  4.0
756
  9.0
757
 16.0
758
 25.0
759
```
760

761
When the iterator doesn't have a known length, such as a channel, the `:greedy` scheduling
762
option can be used.
763
```julia-repl
764
julia> c = Channel(5, spawn=true) do ch
765
           foreach(i -> put!(ch, i), 1:5)
766
       end;
767

768
julia> Threads.@threads :greedy [i^2 for i in c if iseven(i)]
769
2-element Vector{Int64}:
770
  4
771
 16
772

773
julia> # Non-indexable iterators are also supported
774
       Threads.@threads [i for i in Iterators.flatten([1:3, 4:6])]
775
6-element Vector{Int64}:
776
 1
777
 2
778
 3
779
 4
780
 5
781
 6
782
```
783
"""
784
macro threads(args...)
1,610✔
785
    na = length(args)
1,610✔
786
    if na == 2
1,610✔
787
        sched, ex = args
649✔
788
        if sched isa QuoteNode
649✔
789
            sched = sched.value
645✔
790
        elseif sched isa Symbol
4✔
791
            # for now only allow quoted symbols
792
            sched = nothing
×
793
        end
794
        if sched !== :static && sched !== :dynamic && sched !== :greedy
649✔
795
            throw(ArgumentError("unsupported schedule argument in @threads"))
4✔
796
        end
797
    elseif na == 1
961✔
798
        sched = :default
961✔
799
        ex = args[1]
961✔
800
    else
801
        throw(ArgumentError("wrong number of arguments in @threads"))
×
802
    end
803
    if isa(ex, Expr) && (ex.head === :comprehension || ex.head === :typed_comprehension)
2,565✔
804
        # Handle array comprehensions (typed and untyped)
805
        if ex.head === :typed_comprehension
870✔
806
            return _threadsfor_comprehension(ex.args[2], sched, ex.args[1])
225✔
807
        else
808
            return _threadsfor_comprehension(ex.args[1], sched)
645✔
809
        end
810
    elseif isa(ex, Expr) && ex.head === :for
736✔
811
        # Handle for loops
812
        if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
732✔
813
            throw(ArgumentError("nested outer loops are not currently supported by @threads"))
15✔
814
        end
815
        return _threadsfor(ex.args[1], ex.args[2], sched)
717✔
816
    else
817
        throw(ArgumentError("@threads requires a `for` loop or comprehension expression"))
4✔
818
    end
819
end
820

821
function _spawn_set_thrpool(t::Task, tp::Symbol)
15,001,221✔
822
    tpid = _sym_to_tpid(tp)
36,474,335✔
823
    if tpid == -1 || _nthreads_in_pool(tpid) == 0
36,474,347✔
824
        tpid = _sym_to_tpid(:default)
816✔
825
    end
826
    _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
21,473,086✔
827
    @assert _result == 1 "_result != 1"
21,473,086✔
828
    nothing
21,473,086✔
829
end
830

831
"""
832
    Threads.@spawn [:default|:interactive|:samepool] expr
833

834
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
835
thread in the specified threadpool: `:default`, `:interactive`, or `:samepool`
836
to use the same as the caller. `:default` is used if unspecified. The task is
837
allocated to a thread once one becomes available. To wait for the task to
838
finish, call [`wait`](@ref) on the result of this macro, or call
839
[`fetch`](@ref) to wait and then obtain its return value.
840

841
Values can be interpolated into `@spawn` via `\$`, which copies the value
842
directly into the constructed underlying closure. This allows you to insert
843
the _value_ of a variable, isolating the asynchronous code from changes to
844
the variable's value in the current task.
845

846
!!! note
847
    The thread that the task runs on may change if the task yields, therefore `threadid()` should not
848
    be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
849
    [multi-threading](@ref man-multithreading) manual for further important caveats.
850
    See also the chapter on [threadpools](@ref man-threadpools).
851

852
!!! compat "Julia 1.3"
853
    This macro is available as of Julia 1.3.
854

855
!!! compat "Julia 1.4"
856
    Interpolating values via `\$` is available as of Julia 1.4.
857

858
!!! compat "Julia 1.9"
859
    A threadpool may be specified as of Julia 1.9.
860

861
!!! compat "Julia 1.12"
862
    The same threadpool may be specified as of Julia 1.12.
863

864
# Examples
865
```julia-repl
866
julia> t() = println("Hello from ", Threads.threadid());
867

868
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
869
Hello from 1
870
Hello from 1
871
Hello from 3
872
Hello from 4
873
```
874
"""
875
macro spawn(args...)
921✔
876
    tp = QuoteNode(:default)
921✔
877
    na = length(args)
921✔
878
    if na == 2
921✔
879
        ttype, ex = args
50✔
880
        if ttype isa QuoteNode
50✔
881
            ttype = ttype.value
42✔
882
            if !in(ttype, (:interactive, :default, :samepool))
42✔
883
                throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
×
884
            end
885
            tp = QuoteNode(ttype)
42✔
886
        else
887
            tp = ttype
8✔
888
        end
889
    elseif na == 1
871✔
890
        ex = args[1]
871✔
891
    else
892
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
893
    end
894

895
    letargs = Base._lift_one_interp!(ex)
921✔
896

897
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
921✔
898
    var = esc(Base.sync_varname)
921✔
899
    quote
921✔
900
        let $(letargs...)
936✔
901
            local task = Task($thunk)
21,472,467✔
902
            task.sticky = false
21,472,467✔
903
            local tp = $(esc(tp))
21,472,500✔
904
            if tp == :samepool
21,472,467✔
905
                tp = Threads.threadpool()
8✔
906
            end
907
            _spawn_set_thrpool(task, tp)
21,472,467✔
908
            if $(Expr(:islocal, var))
21,472,467✔
909
                put!($var, task)
18,751,808✔
910
            end
911
            schedule(task)
21,466,450✔
912
            task
21,466,450✔
913
        end
914
    end
915
end
916

917
# This is a stub that can be overloaded for downstream structures like `Channel`
918
function foreach end
919

920
# Scheduling traits that can be employed for downstream overloads
921
abstract type AbstractSchedule end
922
struct StaticSchedule <: AbstractSchedule end
30✔
923
struct FairSchedule <: AbstractSchedule end
45✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc