• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / 1500

09 Apr 2026 09:42PM UTC coverage: 77.893% (+0.1%) from 77.768%
1500

push

buildkite

web-flow
fix aliasing of PermutedDimsArray (#61542)

`dataids` was falling back to `objectid` which is wrong, as a
`PermutedDimsArray` plainly aliases its parent

2 of 2 new or added lines in 1 file covered. (100.0%)

67 existing lines in 6 files now uncovered.

65161 of 83654 relevant lines covered (77.89%)

23981703.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.51
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
public Condition, threadpoolsize, ngcthreads
7

8
"""
9
    Threads.threadid([t::Task])::Int
10

11
Get the ID number of the current thread of execution, or the thread of task
12
`t`. The master thread has ID `1`.
13

14
# Examples
15
```julia-repl
16
julia> Threads.threadid()
17
1
18

19
julia> Threads.@threads for i in 1:4
20
          println(Threads.threadid())
21
       end
22
4
23
2
24
5
25
4
26

27
julia> Threads.threadid(Threads.@spawn "foo")
28
2
29
```
30

31
!!! note
32
    The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
33
    For this reason in most cases it is not safe to use `threadid([task])` to index into, say, a vector of buffers or stateful
34
    objects.
35
"""
36
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
336,671,762✔
37

38
# lower bound on the largest threadid()
39
"""
40
    Threads.maxthreadid()::Int
41

42
Get a lower bound on the number of threads (across all thread pools) available
43
to the Julia process, with atomic-acquire semantics. The result will always be
44
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
45
any task you were able to observe before calling `maxthreadid`.
46
"""
47
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
1,599✔
48

49
"""
50
    Threads.nthreads(:default | :interactive)::Int
51

52
Get the current number of threads within the specified thread pool. The threads in `:interactive`
53
have id numbers `1:nthreads(:interactive)`, and the threads in `:default` have id numbers in
54
`nthreads(:interactive) .+ (1:nthreads(:default))`.
55

56
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
57
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
58
standard library and [`Threads.maxthreadid()`](@ref).
59
"""
60
nthreads(pool::Symbol) = threadpoolsize(pool)
1,562✔
61

62
function _nthreads_in_pool(tpid::Int8)
16✔
63
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
224,216,186✔
64
    return Int(unsafe_load(p, tpid + 1))
224,216,186✔
65
end
66

67
function _tpid_to_sym(tpid::Int8)
8✔
68
    if tpid == 0
97,048,837✔
69
        return :interactive
5,994✔
70
    elseif tpid == 1
97,042,843✔
71
        return :default
97,042,843✔
UNCOV
72
    elseif tpid == -1
×
UNCOV
73
        return :foreign
×
74
    else
75
        throw(ArgumentError(LazyString("Unrecognized threadpool id ", tpid)))
×
76
    end
77
end
78

79
function _sym_to_tpid(tp::Symbol)
16✔
80
    if tp === :interactive
169,005,762✔
81
        return Int8(0)
39,967,685✔
82
    elseif tp === :default
129,038,077✔
83
        return Int8(1)
129,038,072✔
84
    elseif tp == :foreign
5✔
85
        return Int8(-1)
×
86
    else
87
        throw(ArgumentError(LazyString("Unrecognized threadpool name `", tp, "`")))
5✔
88
    end
89
end
90

91
"""
92
    Threads.threadpool(tid = threadid())::Symbol
93

94
Return the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
95
"""
96
function threadpool(tid = threadid())
24✔
97
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
5,399✔
98
    return _tpid_to_sym(tpid)
3,578✔
99
end
100

101
"""
102
    Threads.threadpooldescription(tid = threadid())::String
103

104
Return the specified thread's threadpool name with extended description where appropriate.
105
"""
106
function threadpooldescription(tid = threadid())
6✔
107
    threadpool_name = threadpool(tid)
11✔
108
    if threadpool_name == :foreign
6✔
109
        # TODO: extend tls to include a field to add a description to a foreign thread and make this more general
UNCOV
110
        n_others = nthreads(:interactive) + nthreads(:default)
×
111
        # Assumes GC threads come first in the foreign thread pool
UNCOV
112
        if tid > n_others && tid <= n_others + ngcthreads()
×
UNCOV
113
            return "foreign: gc"
×
114
        end
115
    end
116
    return string(threadpool_name)
6✔
117
end
118

119
"""
120
    Threads.nthreadpools()::Int
121

122
Return the number of threadpools currently configured.
123
"""
124
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
2✔
125

126
"""
127
    Threads.threadpoolsize(pool::Symbol = :default)::Int
128

129
Get the number of threads available to the default thread pool (or to the
130
specified thread pool).
131

132
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
133
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
134
[`Distributed`](@ref man-distributed) standard library.
135
"""
136
function threadpoolsize(pool::Symbol = :default)
1,486✔
137
    if pool === :default || pool === :interactive
148,044,528✔
138
        tpid = _sym_to_tpid(pool)
239,565,968✔
139
    elseif pool == :foreign
×
140
        error("Threadpool size of `:foreign` is indeterminant")
×
141
    else
142
        error("invalid threadpool specified")
×
143
    end
144
    return _nthreads_in_pool(tpid)
142,524,366✔
145
end
146

147
"""
148
    threadpooltids(pool::Symbol)
149

150
Return a vector of IDs of threads in the given pool.
151
"""
152
function threadpooltids(pool::Symbol)
4✔
153
    ni = _nthreads_in_pool(Int8(0))
4✔
154
    if pool === :interactive
4✔
155
        return collect(1:ni)
2✔
156
    elseif pool === :default
2✔
157
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
2✔
158
    else
159
        error("invalid threadpool specified")
×
160
    end
161
end
162

163
"""
164
    Threads.ngcthreads()::Int
165

166
Return the number of GC threads currently configured.
167
This includes both mark threads and concurrent sweep threads.
168
"""
169
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
25✔
170

171
function threading_run(fun, static)
1,763,778✔
172
    ccall(:jl_enter_threaded_region, Cvoid, ())
1,763,778✔
173
    n = threadpoolsize()
1,763,778✔
174
    tid_offset = threadpoolsize(:interactive)
1,763,778✔
175
    tasks = Vector{Task}(undef, n)
1,763,778✔
176
    try
1,763,778✔
177
        for i = 1:n
3,527,550✔
178
            t = Task(() -> fun(i)) # pass in tid
7,496,372✔
179
            t.sticky = static
3,748,186✔
180
            if static
3,748,186✔
181
                ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
516✔
182
            else
183
                # TODO: this should be the current pool (except interactive) if there
184
                # are ever more than two pools.
185
                _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default))
3,747,670✔
186
                @assert _result == 1 "_result != 1"
3,747,670✔
187
            end
188
            tasks[i] = t
3,748,186✔
189
            schedule(t)
3,748,186✔
190
        end
5,732,594✔
191
        for i = 1:n
3,527,550✔
192
            Base._wait(tasks[i])
3,748,185✔
193
        end
5,732,591✔
194
    finally
195
        ccall(:jl_exit_threaded_region, Cvoid, ())
1,763,777✔
196
    end
197
    failed_tasks = filter!(istaskfailed, tasks)
1,763,783✔
198
    if !isempty(failed_tasks)
1,763,777✔
199
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
64✔
200
    end
201
end
202

203
# Helper to generate threading run code with schedule checking
204
function _threading_run_expr(schedule)
1,687✔
205
    quote
1,687✔
206
        if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
1,763,794✔
207
            threading_run(threadsfor_fun, false)
1,763,535✔
208
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
259✔
209
            error("`@threads :static` cannot be used concurrently or nested")
16✔
210
        else # :static
211
            threading_run(threadsfor_fun, true)
243✔
212
        end
213
    end
214
end
215

216
function _threadsfor(iter, lbody, schedule)
759✔
217
    lidx = iter.args[1]         # index
759✔
218
    range = iter.args[2]
759✔
219
    esc_range = esc(range)
759✔
220
    func = if schedule === :greedy
759✔
221
        greedy_func(esc_range, lidx, lbody)
112✔
222
    else
223
        default_func(esc_range, lidx, lbody)
1,406✔
224
    end
225
    quote
759✔
226
        local threadsfor_fun
227
        $func
228
        $(_threading_run_expr(schedule))
229
        nothing
1,762,735✔
230
    end
231
end
232

233
function _threadsfor_multi_iterator(body, iterators, condition, schedule, dims, result_type)
128✔
234
    vars = [iter.args[1] for iter in iterators]
128✔
235
    ranges = [iter.args[2] for iter in iterators]
128✔
236

237
    tuple_var = gensym("iter_tuple")
128✔
238
    assignments = [:($(vars[i]) = $(tuple_var)[$i]) for i in 1:length(vars)]
128✔
239
    # Use let blocks so destructured variables are local to each iteration,
240
    # avoiding data races when multiple threads execute the body concurrently.
241
    new_body = Expr(:let, Expr(:block, assignments...), body)
128✔
242
    new_condition = if condition === true
128✔
243
        true
96✔
244
    else
245
        Expr(:let, Expr(:block, assignments...), condition)
128✔
246
    end
247

248
    product_expr = :(Iterators.product($(ranges...)))
128✔
249
    synthetic_iter = :($(tuple_var) = $(product_expr))
128✔
250

251
    return _threadsfor_single_iterator(new_body, synthetic_iter, new_condition, schedule, dims; result_type)
128✔
252
end
253

254
function _threadsfor_comprehension(gen::Expr, schedule, result_type=nothing)
1,616✔
255
    @assert gen.head === :generator
1,616✔
256

257
    body = gen.args[1]
928✔
258

259
    # Check if the second arg is a filter (handles both single and multi-loop with filters)
260
    iter_or_filter = gen.args[2]
928✔
261
    if isa(iter_or_filter, Expr) && iter_or_filter.head === :filter
928✔
262
        condition = iter_or_filter.args[1]
320✔
263
        iterators = iter_or_filter.args[2:end]
640✔
264

265
        if length(iterators) == 1
320✔
266
            return _threadsfor_single_iterator(body, iterators[1], condition, schedule; result_type)
288✔
267
        else
268
            return _threadsfor_multi_iterator(body, iterators, condition, schedule, nothing, result_type)
32✔
269
        end
270
    elseif length(gen.args) > 2
608✔
271
        iterators = gen.args[2:end]
192✔
272
        ranges = [iter.args[2] for iter in iterators]
96✔
273
        # Use axes to preserve offset index spaces (e.g. OffsetArrays)
274
        dims_expr = :(tuple($([:(axes($(esc(r)), 1)) for r in ranges]...)))
96✔
275
        return _threadsfor_multi_iterator(body, iterators, true, schedule, dims_expr, result_type)
96✔
276
    else
277
        return _threadsfor_single_iterator(body, iter_or_filter, true, schedule; result_type)
512✔
278
    end
279
end
280

281
function _threadsfor_single_iterator(body, iterator, condition, schedule, dims=nothing; result_type=nothing)
2,656✔
282
    lidx = iterator.args[1]
928✔
283
    range = iterator.args[2]
928✔
284
    esc_range = esc(range)
928✔
285
    esc_lidx = esc(lidx)
928✔
286
    esc_body = esc(body)
928✔
287
    esc_condition = condition === true ? true : esc(condition)
928✔
288

289
    # Fast path: no filter and not greedy — pre-allocate and write directly
290
    if condition === true && schedule !== :greedy
928✔
291
        return _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
480✔
292
    end
293

294
    func = if schedule === :greedy
448✔
295
        greedy_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition)
240✔
296
    else
297
        default_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition, result_type)
656✔
298
    end
299

300
    result_expr = if schedule === :greedy
448✔
301
        # Greedy: collect values in arrival order (no ordering guarantee)
302
        if result_type !== nothing
240✔
303
            esc_result_type = esc(result_type)
48✔
304
            quote
48✔
305
                close(result_channel)
61✔
306
                vals = collect(result_channel)
61✔
307
                isempty(vals) ? $esc_result_type[] : $esc_result_type[v for v in vals]
91✔
308
            end
309
        else
310
            quote
192✔
311
                close(result_channel)
199✔
312
                collect(result_channel)
199✔
313
            end
314
        end
315
    else
316
        # Default/static: thread-local buffers, vcat in tid order preserves iteration order.
317
        # For the untyped case, we try vcat first (fast bulk copies). If the buffers have
318
        # a Union or Any element type — indicating a type-unstable body — we fall back to
319
        # grow_to! which replicates serial's promote_typejoin widening.
320
        if result_type !== nothing
208✔
321
            esc_result_type = esc(result_type)
32✔
322
            quote
32✔
323
                vcat(result_channel...)::Vector{$esc_result_type}
45✔
324
            end
325
        else
326
            quote
624✔
327
                let _bufs = result_channel
182✔
328
                    _ET = eltype(eltype(_bufs))
182✔
329
                    if isconcretetype(_ET)
182✔
330
                        # All buffers have a concrete element type (from promote_op inference
331
                        # or a uniform body). Use bulk vcat — same result as grow_to! here.
332
                        vcat(_bufs...)
152✔
333
                    else
334
                        # Type-unstable body: grow_to! discovers the correct widened type,
335
                        # matching the return type of the equivalent serial comprehension.
336
                        Base.grow_to!(Any[], Iterators.flatten(_bufs))
182✔
337
                    end
338
                end
339
            end
340
        end
341
    end
342

343
    # If dims is provided, reshape the result to match original comprehension dimensions
344
    if dims !== nothing
448✔
345
        result_expr = quote
16✔
346
            let flat_result = $result_expr
347
                reshape(flat_result, $(dims))
15✔
348
            end
349
        end
350
    end
351

352
    quote
448✔
353
        local threadsfor_fun
354
        local result_channel = $func
355
        $(_threading_run_expr(schedule))
356
        $result_expr
357
    end
358
end
359

360
# Fast path for non-filtered, non-greedy comprehensions: pre-allocate and write directly.
361
# Non-AbstractArray iterators (e.g. Iterators.flatten) are collected into a Vector
362
# because the parallel work distribution indexes into items with r[i].
363
# AbstractArrays and Tuples are used directly to preserve their index space (e.g. OffsetArrays).
364
function _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
480✔
365
    work_dist = _work_distribution_code()
480✔
366
    wrap_final = dims !== nothing ? (x -> :(reshape($x, $dims))) : identity
624✔
367

368
    if result_type !== nothing
480✔
369
        # Typed path: pre-allocate with known element type
370
        esc_result_type = esc(result_type)
160✔
371
        return quote
160✔
372
            let iter = $esc_range
195✔
373
            local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
210✔
374
            local niter = length(items)
195✔
375
            local result = similar(Vector{$esc_result_type}, axes(items))
195✔
376
            if niter > 0
195✔
377
                let items = items, result = result
180✔
378
                local threadsfor_fun
379
                function threadsfor_fun(tid = 1)
532✔
380
                    # Reads: items, tid. Defines: r, loop_first, loop_last.
381
                    $work_dist
384✔
382
                    for i = loop_first:loop_last
564✔
383
                        local $esc_lidx = @inbounds r[i]
3,495✔
384
                        @inbounds result[i] = $esc_body
3,495✔
385
                    end
3,495✔
386
                end
387
                $(_threading_run_expr(schedule))
388
                end
389
            end
390
            $(wrap_final(:(result)))
195✔
391
            end
392
        end
393
    else
394
        # Untyped path: evaluate first element to determine result type,
395
        # then fill in parallel with the body expression inlined directly
396
        # in the closure. This avoids boxing that occurs when calling a
397
        # lambda whose return type is a Union across closure boundaries.
398
        return quote
320✔
399
            let iter = $esc_range
317✔
400
            local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
407✔
401
            local niter = length(items)
317✔
402
            if niter == 0
317✔
403
                $(wrap_final(:(similar(Vector{Any}, axes(items)))))
15✔
404
            else
405
                local _skip = firstindex(items)
302✔
406
                local $esc_lidx = @inbounds items[_skip]
302✔
407
                local _probe_val = $esc_body
317✔
408
                local result = similar(Vector{typeof(_probe_val)}, axes(items))
302✔
409
                @inbounds result[_skip] = _probe_val
302✔
410
                if niter > 1
302✔
411
                    local _npool = threadpoolsize()
287✔
412
                    local _widen_buffers = [Pair{Int,Any}[] for _ in 1:_npool]
317✔
413
                    let items = items, result = result, _widen_buffers = _widen_buffers,
287✔
414
                        _skip = _skip
415
                    local threadsfor_fun
416
                    function threadsfor_fun(tid = 1)
899✔
417
                        # Reads: items, tid. Defines: r, loop_first, loop_last.
418
                        $work_dist
612✔
419
                        local _T = eltype(result)
604✔
420
                        local _my_widen = _widen_buffers[tid]
604✔
421
                        for i = loop_first:loop_last
1,208✔
422
                            i == _skip && continue
3,187,505✔
423
                            local $esc_lidx = @inbounds r[i]
3,187,218✔
424
                            local _val = $esc_body
3,187,218✔
425
                            if _val isa _T
3,187,218✔
426
                                @inbounds result[i] = _val
3,187,113✔
427
                            else
428
                                push!(_my_widen, i => _val)
105✔
429
                            end
430
                        end
3,187,505✔
431
                    end
432
                    $(_threading_run_expr(schedule))
433
                    end
434
                    result = Base.setindices_widen_up_to(result, _widen_buffers)
287✔
435
                end
436
                $(wrap_final(:(result)))
589✔
437
            end
438
            end
439
        end
440
    end
441
end
442

443

444
function greedy_func(itr, lidx, lbody)
112✔
445
    quote
112✔
446
        let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
214✔
447
            for item in $itr
428✔
448
                put!(ch, item)
662✔
449
            end
662✔
450
        end
451
        function threadsfor_fun(tid)
700✔
452
            for item in c
520✔
453
                local $(esc(lidx)) = item
662✔
454
                $(esc(lbody))
662✔
455
            end
628✔
456
        end
457
        end
458
    end
459
end
460

461
function greedy_comprehension_func(itr, esc_lidx, esc_body, esc_condition)
240✔
462
    quote
240✔
463
        let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
261✔
464
                for item in $itr
402✔
465
                    put!(ch, item)
32,022,015✔
466
                end
32,022,015✔
467
            end
468
            result_channel = Channel{Any}(Inf)
261✔
469

470
            function threadsfor_fun(tid)
817✔
471
                for item in c
556✔
472
                    local $esc_lidx = item
32,022,015✔
473
                    if $esc_condition
32,022,015✔
474
                        put!(result_channel, $esc_body)
16,020,785✔
475
                    end
476
                end
32,022,015✔
477
            end
478
            result_channel
261✔
479
        end
480
    end
481
end
482

483
# Helper function to generate work distribution code
484
function _work_distribution_code()
485
    quote
1,335✔
486
        r = items # Load into local variable
3,747,110✔
487
        lenr = length(r)
3,747,194✔
488
        # divide loop iterations among threads
489
        len, rem = divrem(lenr, threadpoolsize())
3,747,110✔
490
        # not enough iterations for all the threads?
491
        if len == 0
3,747,110✔
492
            if tid > rem
366✔
493
                return
214✔
494
            end
495
            len, rem = 1, 0
152✔
496
        end
497
        # compute this thread's iterations
498
        loop_first = firstindex(r) + ((tid-1) * len)
3,746,896✔
499
        loop_last = loop_first + len - 1
3,746,896✔
500
        # distribute remaining iterations evenly
501
        if rem > 0
3,746,896✔
502
            if tid <= rem
600✔
503
                loop_first = loop_first + (tid-1)
260✔
504
                loop_last = loop_last + tid
260✔
505
            else
506
                loop_first = loop_first + rem
340✔
507
                loop_last = loop_last + rem
340✔
508
            end
509
        end
510
    end
511
end
512

513
function default_func(itr, lidx, lbody)
647✔
514
    work_dist = _work_distribution_code()
647✔
515
    quote
647✔
516
        let items = $itr
1,762,661✔
517
        function threadsfor_fun(tid = 1)
5,508,221✔
518
            # Reads: items, tid. Defines: r, loop_first, loop_last.
519
            $work_dist
3,745,630✔
520
            for i = loop_first:loop_last
7,491,046✔
521
                local $(esc(lidx)) = @inbounds r[i]
1,800,309,638✔
522
                $(esc(lbody))
18,228,158✔
523
            end
1,800,309,554✔
524
        end
525
        end
526
    end
527
end
528

529
function default_comprehension_func(itr, esc_lidx, esc_body, esc_condition, result_type=nothing)
208✔
530
    work_dist = _work_distribution_code()
208✔
531
    if result_type !== nothing
208✔
532
        # Typed comprehension: element type known at macro expansion time.
533
        buf_init = :($(esc(result_type))[])
32✔
534
        buf_type_setup = :()
32✔
535
    else
536
        # Untyped comprehension: use promote_op to pre-type the per-task buffers,
537
        # avoiding boxing in the parallel phase for type-stable bodies.
538
        # The result is flattened through grow_to! so the final element type matches
539
        # serial's runtime promote_typejoin widening rather than the static promote_op type.
540
        _ET = gensym(:ET)
176✔
541
        buf_type_setup = :(local $_ET = Base.promote_op($esc_lidx -> $esc_body, eltype(items)))
176✔
542
        buf_init = :($_ET[])
176✔
543
    end
544
    quote
208✔
545
        let iter = $itr
227✔
546
        local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
272✔
547
        local _npool = threadpoolsize()
227✔
548
        $buf_type_setup
227✔
549
        # One buffer per task-id; tasks process contiguous ranges so concatenating
550
        # in tid order preserves iteration order without a sort step.
551
        local local_bufs = [$buf_init for _ in 1:_npool]
287✔
552

553
        function threadsfor_fun(tid = 1)
711✔
554
            # Reads: items, tid. Defines: r, loop_first, loop_last.
555
            $work_dist
484✔
556
            local buf = local_bufs[tid]
484✔
557
            for i = loop_first:loop_last
968✔
558
                local $esc_lidx = @inbounds r[i]
30,009,320✔
559
                if $esc_condition
30,009,320✔
560
                    push!(buf, $esc_body)
15,005,680✔
561
                end
562
            end
30,009,320✔
563
        end
564
        local_bufs  # Return per-task buffers to be vcat'd after threading_run
227✔
565
        end
566
    end
567
end
568

569
"""
570
    Threads.@threads [schedule] for ... end
571
    Threads.@threads [schedule] [expr for ... end]
572
    Threads.@threads [schedule] T[expr for ... end]
573

574
A macro to execute a `for` loop or array comprehension in parallel. The iteration space is distributed to
575
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
576
execution of the loop waits for the evaluation of all iterations.
577

578
For `for` loops, the macro executes the loop body in parallel but does not return a value.
579
For array comprehensions, the macro executes the comprehension in parallel and returns
580
the collected results as an array.
581

582
Tasks spawned by `@threads` are scheduled on the `:default` threadpool. This means that
583
`@threads` will not use threads from the `:interactive` threadpool, even if called from
584
the main thread or from a task in the interactive pool. The `:default` threadpool is
585
intended for compute-intensive parallel workloads.
586

587
See also: [`@spawn`](@ref Threads.@spawn) and
588
`pmap` in [`Distributed`](@ref man-distributed).
589
For more information on threadpools, see the chapter on [threadpools](@ref man-threadpools).
590

591
# Extended help
592

593
## Semantics
594

595
Unless stronger guarantees are specified by the scheduling option, the loop executed by
596
`@threads` macro have the following semantics.
597

598
The `@threads` macro executes the loop body in an unspecified order and potentially
599
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
600
The assignments can be different for each execution. The loop body code (including any code
601
transitively called from it) must not make any assumptions about the distribution of
602
iterations to tasks or the worker thread in which they are executed. The loop body for each
603
iteration must be able to make forward progress independent of other iterations and be free
604
from data races. As such, invalid synchronizations across iterations may deadlock while
605
unsynchronized memory accesses may result in undefined behavior.
606

607
For example, the above conditions imply that:
608

609
- A lock taken in an iteration *must* be released within the same iteration.
610
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
611
- Write only to locations not shared across iterations (unless a lock or atomic operation is
612
  used).
613
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
614
  may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
615

616
## Schedulers
617

618
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
619
releases. Currently, `:dynamic` is used when the scheduler is not specified.
620

621
!!! compat "Julia 1.5"
622
    The `schedule` argument is available as of Julia 1.5.
623

624
### `:dynamic` (default)
625

626
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
627
implementation assumes that the workload for each iteration is uniform. However, this
628
assumption may be removed in the future.
629

630
This scheduling option is merely a hint to the underlying execution mechanism. However, a
631
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
632
bounded by a small constant multiple of the number of available worker threads
633
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
634
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
635
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
636
larger than the number of the worker threads and the run-time of `f(x)` is relatively
637
smaller than the cost of spawning and synchronizing a task (typically less than 10
638
microseconds).
639

640
!!! compat "Julia 1.8"
641
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
642

643
### `:greedy`
644

645
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
646
the given iterated values as they are produced. As soon as one task finishes its work, it takes
647
the next value from the iterator. Work done by any individual task is not necessarily on
648
contiguous values from the iterator. The given iterator may produce values forever, only the
649
iterator interface is required (no indexing).
650

651
This scheduling option is generally a good choice if the workload of individual iterations
652
is not uniform/has a large spread.
653

654
!!! compat "Julia 1.11"
655
    The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
656

657
### `:static`
658

659
`:static` scheduler creates one task per thread and divides the iterations equally among
660
them, assigning each task specifically to each thread. In particular, the value of
661
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
662
Specifying `:static` is an error if used from inside another `@threads` loop or from a
663
thread other than 1.
664

665
!!! note
666
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
667
    In newly written library functions, `:static` scheduling is discouraged because the
668
    functions using this option cannot be called from arbitrary worker threads.
669

670
## Examples
671

672
### For loops
673

674
To illustrate of the different scheduling strategies, consider the following function
675
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
676

677
```julia-repl
678
julia> function busywait(seconds)
679
            tstart = time_ns()
680
            while (time_ns() - tstart) / 1e9 < seconds
681
            end
682
        end
683

684
julia> @time begin
685
            Threads.@spawn busywait(5)
686
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
687
                busywait(1)
688
            end
689
        end
690
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
691

692
julia> @time begin
693
            Threads.@spawn busywait(5)
694
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
695
                busywait(1)
696
            end
697
        end
698
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
699
```
700

701
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
702
to run two of the 1-second iterations to complete the for loop.
703

704
### Array comprehensions
705

706
The `@threads` macro also supports array comprehensions, which return the collected results.
707
Array comprehensions preserve element order for `:static` and `:dynamic` (default) scheduling.
708
The `:greedy` scheduler does not guarantee element order, since tasks consume work items as
709
they become available. Multi-dimensional comprehensions preserve the dimensions of the
710
original comprehension (e.g., `[f(i,j) for i in 1:n, j in 1:m]` returns an `n×m` matrix).
711
Typed comprehensions (`T[expr for ...]`) are also supported and return an array with the
712
specified element type.
713

714
For non-filtered comprehensions with non-`:greedy` scheduling, a fast path is used that
715
pre-allocates the result array and writes directly by index, avoiding Channel overhead.
716

717
!!! tip "Performance tip"
718
    For best performance, use typed comprehensions (`T[expr for ...]`) when the element type
719
    is known. Untyped comprehensions infer the type from the first result; if later results
720
    have incompatible types, a type-widening path is used which may be slower.
721

722
!!! warning
723
    The body expression of a threaded comprehension may execute on any thread and may
724
    migrate between threads. Do not rely on [`threadid()`](@ref Threads.threadid) or
725
    [`task_local_storage()`](@ref) returning consistent values within the body expression.
726

727
```julia-repl
728
julia> Threads.@threads [i^2 for i in 1:5] # Simple comprehension
729
5-element Vector{Int64}:
730
   1
731
   4
732
   9
733
  16
734
  25
735

736
julia> Threads.@threads [i^2 for i in 1:5 if iseven(i)] # Filtered comprehension
737
2-element Vector{Int64}:
738
   4
739
  16
740

741
julia> Threads.@threads [i + j for i in 1:3, j in 1:3] # Multiple loops
742
3×3 Matrix{Int64}:
743
 2  3  4
744
 3  4  5
745
 4  5  6
746

747
julia> Threads.@threads Float64[i^2 for i in 1:5] # Typed comprehension
748
5-element Vector{Float64}:
749
  1.0
750
  4.0
751
  9.0
752
 16.0
753
 25.0
754
```
755

756
When the iterator doesn't have a known length, such as a channel, the `:greedy` scheduling
757
option can be used.
758
```julia-repl
759
julia> c = Channel(5, spawn=true) do ch
760
           foreach(i -> put!(ch, i), 1:5)
761
       end;
762

763
julia> Threads.@threads :greedy [i^2 for i in c if iseven(i)]
764
2-element Vector{Int64}:
765
  4
766
 16
767

768
julia> # Non-indexable iterators are also supported
769
       Threads.@threads [i for i in Iterators.flatten([1:3, 4:6])]
770
6-element Vector{Int64}:
771
 1
772
 2
773
 3
774
 4
775
 5
776
 6
777
```
778
"""
779
macro threads(args...)
1,711✔
780
    na = length(args)
1,711✔
781
    if na == 2
1,711✔
782
        sched, ex = args
692✔
783
        if sched isa QuoteNode
692✔
784
            sched = sched.value
688✔
785
        elseif sched isa Symbol
4✔
786
            # for now only allow quoted symbols
787
            sched = nothing
×
788
        end
789
        if sched !== :static && sched !== :dynamic && sched !== :greedy
692✔
790
            throw(ArgumentError("unsupported schedule argument in @threads"))
4✔
791
        end
792
    elseif na == 1
1,019✔
793
        sched = :default
1,019✔
794
        ex = args[1]
1,019✔
795
    else
796
        throw(ArgumentError("wrong number of arguments in @threads"))
×
797
    end
798
    if isa(ex, Expr) && (ex.head === :comprehension || ex.head === :typed_comprehension)
2,724✔
799
        # Handle array comprehensions (typed and untyped)
800
        if ex.head === :typed_comprehension
928✔
801
            return _threadsfor_comprehension(ex.args[2], sched, ex.args[1])
240✔
802
        else
803
            return _threadsfor_comprehension(ex.args[1], sched)
688✔
804
        end
805
    elseif isa(ex, Expr) && ex.head === :for
779✔
806
        # Handle for loops
807
        if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
775✔
808
            throw(ArgumentError("nested outer loops are not currently supported by @threads"))
16✔
809
        end
810
        return _threadsfor(ex.args[1], ex.args[2], sched)
759✔
811
    else
812
        throw(ArgumentError("@threads requires a `for` loop or comprehension expression"))
4✔
813
    end
814
end
815

816
function _spawn_set_thrpool(t::Task, tp::Symbol)
16,001,294✔
817
    tpid = _sym_to_tpid(tp)
38,750,023✔
818
    if tpid == -1 || _nthreads_in_pool(tpid) == 0
38,750,037✔
819
        tpid = _sym_to_tpid(:default)
816✔
820
    end
821
    _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
22,747,016✔
822
    @assert _result == 1 "_result != 1"
22,747,016✔
823
    nothing
22,746,856✔
824
end
825

826
"""
827
    Threads.@spawn [:default|:interactive|:samepool] expr
828

829
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
830
thread in the specified threadpool: `:default`, `:interactive`, or `:samepool`
831
to use the same as the caller. `:default` is used if unspecified. The task is
832
allocated to a thread once one becomes available. To wait for the task to
833
finish, call [`wait`](@ref) on the result of this macro, or call
834
[`fetch`](@ref) to wait and then obtain its return value.
835

836
Values can be interpolated into `@spawn` via `\$`, which copies the value
837
directly into the constructed underlying closure. This allows you to insert
838
the _value_ of a variable, isolating the asynchronous code from changes to
839
the variable's value in the current task.
840

841
!!! note
842
    The thread that the task runs on may change if the task yields, therefore `threadid()` should not
843
    be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
844
    [multi-threading](@ref man-multithreading) manual for further important caveats.
845
    See also the chapter on [threadpools](@ref man-threadpools).
846

847
!!! compat "Julia 1.3"
848
    This macro is available as of Julia 1.3.
849

850
!!! compat "Julia 1.4"
851
    Interpolating values via `\$` is available as of Julia 1.4.
852

853
!!! compat "Julia 1.9"
854
    A threadpool may be specified as of Julia 1.9.
855

856
!!! compat "Julia 1.12"
857
    The same threadpool may be specified as of Julia 1.12.
858

859
# Examples
860
```julia-repl
861
julia> t() = println("Hello from ", Threads.threadid());
862

863
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
864
Hello from 1
865
Hello from 1
866
Hello from 3
867
Hello from 4
868
```
869
"""
870
macro spawn(args...)
973✔
871
    tp = QuoteNode(:default)
973✔
872
    na = length(args)
973✔
873
    if na == 2
973✔
874
        ttype, ex = args
52✔
875
        if ttype isa QuoteNode
52✔
876
            ttype = ttype.value
44✔
877
            if !in(ttype, (:interactive, :default, :samepool))
44✔
878
                throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
×
879
            end
880
            tp = QuoteNode(ttype)
44✔
881
        else
882
            tp = ttype
8✔
883
        end
884
    elseif na == 1
921✔
885
        ex = args[1]
921✔
886
    else
887
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
888
    end
889

890
    letargs = Base._lift_one_interp!(ex)
973✔
891

892
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
973✔
893
    var = esc(Base.sync_varname)
973✔
894
    quote
973✔
895
        let $(letargs...)
998✔
896
            local task = Task($thunk)
22,746,378✔
897
            task.sticky = false
22,746,378✔
898
            local tp = $(esc(tp))
22,746,411✔
899
            if tp == :samepool
22,746,378✔
900
                tp = Threads.threadpool()
3,381✔
901
            end
902
            _spawn_set_thrpool(task, tp)
22,746,378✔
903
            if $(Expr(:islocal, var))
22,746,378✔
904
                put!($var, task)
20,001,652✔
905
            end
906
            schedule(task)
22,739,568✔
907
            task
22,739,568✔
908
        end
909
    end
910
end
911

912
# This is a stub that can be overloaded for downstream structures like `Channel`
913
function foreach end
914

915
# Scheduling traits that can be employed for downstream overloads
916
abstract type AbstractSchedule end
917
struct StaticSchedule <: AbstractSchedule end
32✔
918
struct FairSchedule <: AbstractSchedule end
48✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc