• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37845

20 Jul 2024 10:57PM UTC coverage: 86.098% (+0.7%) from 85.368%
#37845

push

local

web-flow
Update stable version number in Readme to v1.10.4 (#55186)

76223 of 88531 relevant lines covered (86.1%)

15332895.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.66
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
"""
7
    Threads.threadid() -> Int
8

9
Get the ID number of the current thread of execution. The master thread has
10
ID `1`.
11

12
# Examples
13
```julia-repl
14
julia> Threads.threadid()
15
1
16

17
julia> Threads.@threads for i in 1:4
18
          println(Threads.threadid())
19
       end
20
4
21
2
22
5
23
4
24
```
25

26
!!! note
27
    The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
28
    For this reason in most cases it is not safe to use `threadid()` to index into, say, a vector of buffer or stateful objects.
29

30
"""
31
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
2,540,396✔
32

33
# lower bound on the largest threadid()
34
"""
35
    Threads.maxthreadid() -> Int
36

37
Get a lower bound on the number of threads (across all thread pools) available
38
to the Julia process, with atomic-acquire semantics. The result will always be
39
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
40
any task you were able to observe before calling `maxthreadid`.
41
"""
42
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
73✔
43

44
"""
45
    Threads.nthreads(:default | :interactive) -> Int
46

47
Get the current number of threads within the specified thread pool. The threads in `:interactive`
48
have id numbers `1:nthreads(:interactive)`, and the threads in `:default` have id numbers in
49
`nthreads(:interactive) .+ (1:nthreads(:default))`.
50

51
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
52
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
53
standard library and [`Threads.maxthreadid()`](@ref).
54
"""
55
nthreads(pool::Symbol) = threadpoolsize(pool)
12✔
56

57
function _nthreads_in_pool(tpid::Int8)
58
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
27,595✔
59
    return Int(unsafe_load(p, tpid + 1))
27,595✔
60
end
61

62
function _tpid_to_sym(tpid::Int8)
63
    if tpid == 0
12,311✔
64
        return :interactive
9✔
65
    elseif tpid == 1
12,302✔
66
        return :default
12,302✔
67
    elseif tpid == -1
×
68
        return :foreign
×
69
    else
70
        throw(ArgumentError(LazyString("Unrecognized threadpool id ", tpid)))
×
71
    end
72
end
73

74
function _sym_to_tpid(tp::Symbol)
302✔
75
    if tp === :interactive
12,613✔
76
        return Int8(0)
15✔
77
    elseif tp === :default
12,598✔
78
        return Int8(1)
12,597✔
79
    elseif tp == :foreign
1✔
80
        return Int8(-1)
×
81
    else
82
        throw(ArgumentError(LazyString("Unrecognized threadpool name `", tp, "`")))
1✔
83
    end
84
end
85

86
"""
87
    Threads.threadpool(tid = threadid()) -> Symbol
88

89
Returns the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
90
"""
91
function threadpool(tid = threadid())
×
92
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
×
93
    return _tpid_to_sym(tpid)
×
94
end
95

96
"""
97
    Threads.nthreadpools() -> Int
98

99
Returns the number of threadpools currently configured.
100
"""
101
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
×
102

103
"""
104
    Threads.threadpoolsize(pool::Symbol = :default) -> Int
105

106
Get the number of threads available to the default thread pool (or to the
107
specified thread pool).
108

109
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
110
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
111
[`Distributed`](@ref man-distributed) standard library.
112
"""
113
function threadpoolsize(pool::Symbol = :default)
114
    if pool === :default || pool === :interactive
12,388✔
115
        tpid = _sym_to_tpid(pool)
24,622✔
116
    elseif pool == :foreign
×
117
        error("Threadpool size of `:foreign` is indeterminant")
×
118
    else
119
        error("invalid threadpool specified")
×
120
    end
121
    return _nthreads_in_pool(tpid)
24,461✔
122
end
123

124
"""
125
    threadpooltids(pool::Symbol)
126

127
Returns a vector of IDs of threads in the given pool.
128
"""
129
function threadpooltids(pool::Symbol)
×
130
    ni = _nthreads_in_pool(Int8(0))
×
131
    if pool === :interactive
×
132
        return collect(1:ni)
×
133
    elseif pool === :default
×
134
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
×
135
    else
136
        error("invalid threadpool specified")
×
137
    end
138
end
139

140
"""
141
    Threads.ngcthreads() -> Int
142

143
Returns the number of GC threads currently configured.
144
This includes both mark threads and concurrent sweep threads.
145
"""
146
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
16✔
147

148
function threading_run(fun, static)
×
149
    ccall(:jl_enter_threaded_region, Cvoid, ())
×
150
    n = threadpoolsize()
×
151
    tid_offset = threadpoolsize(:interactive)
×
152
    tasks = Vector{Task}(undef, n)
×
153
    for i = 1:n
×
154
        t = Task(() -> fun(i)) # pass in tid
×
155
        t.sticky = static
×
156
        if static
×
157
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
×
158
        else
159
            # TODO: this should be the current pool (except interactive) if there
160
            # are ever more than two pools.
161
            _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default))
×
162
            @assert _result == 1
×
163
        end
164
        tasks[i] = t
×
165
        schedule(t)
×
166
    end
×
167
    for i = 1:n
×
168
        Base._wait(tasks[i])
×
169
    end
×
170
    ccall(:jl_exit_threaded_region, Cvoid, ())
×
171
    failed_tasks = filter!(istaskfailed, tasks)
×
172
    if !isempty(failed_tasks)
×
173
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
×
174
    end
175
end
176

177
function _threadsfor(iter, lbody, schedule)
×
178
    lidx = iter.args[1]         # index
×
179
    range = iter.args[2]
×
180
    esc_range = esc(range)
×
181
    func = if schedule === :greedy
×
182
        greedy_func(esc_range, lidx, lbody)
×
183
    else
184
        default_func(esc_range, lidx, lbody)
×
185
    end
186
    quote
×
187
        local threadsfor_fun
×
188
        $func
×
189
        if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
×
190
            threading_run(threadsfor_fun, false)
×
191
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
×
192
            error("`@threads :static` cannot be used concurrently or nested")
×
193
        else # :static
194
            threading_run(threadsfor_fun, true)
×
195
        end
196
        nothing
×
197
    end
198
end
199

200
function greedy_func(itr, lidx, lbody)
×
201
    quote
×
202
        let c = Channel{eltype($itr)}(0,spawn=true) do ch
×
203
            for item in $itr
×
204
                put!(ch, item)
×
205
            end
×
206
        end
207
        function threadsfor_fun(tid)
×
208
            for item in c
×
209
                local $(esc(lidx)) = item
×
210
                $(esc(lbody))
×
211
            end
×
212
        end
213
        end
214
    end
215
end
216

217
function default_func(itr, lidx, lbody)
×
218
    quote
×
219
        let range = $itr
×
220
        function threadsfor_fun(tid = 1; onethread = false)
×
221
            r = range # Load into local variable
×
222
            lenr = length(r)
×
223
            # divide loop iterations among threads
224
            if onethread
×
225
                tid = 1
×
226
                len, rem = lenr, 0
×
227
            else
228
                len, rem = divrem(lenr, threadpoolsize())
×
229
            end
230
            # not enough iterations for all the threads?
231
            if len == 0
×
232
                if tid > rem
×
233
                    return
×
234
                end
235
                len, rem = 1, 0
×
236
            end
237
            # compute this thread's iterations
238
            f = firstindex(r) + ((tid-1) * len)
×
239
            l = f + len - 1
×
240
            # distribute remaining iterations evenly
241
            if rem > 0
×
242
                if tid <= rem
×
243
                    f = f + (tid-1)
×
244
                    l = l + tid
×
245
                else
246
                    f = f + rem
×
247
                    l = l + rem
×
248
                end
249
            end
250
            # run this thread's iterations
251
            for i = f:l
×
252
                local $(esc(lidx)) = @inbounds r[i]
×
253
                $(esc(lbody))
×
254
            end
×
255
        end
256
        end
257
    end
258
end
259

260
"""
261
    Threads.@threads [schedule] for ... end
262

263
A macro to execute a `for` loop in parallel. The iteration space is distributed to
264
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
265
execution of the loop waits for the evaluation of all iterations.
266

267
See also: [`@spawn`](@ref Threads.@spawn) and
268
`pmap` in [`Distributed`](@ref man-distributed).
269

270
# Extended help
271

272
## Semantics
273

274
Unless stronger guarantees are specified by the scheduling option, the loop executed by
275
`@threads` macro have the following semantics.
276

277
The `@threads` macro executes the loop body in an unspecified order and potentially
278
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
279
The assignments can be different for each execution. The loop body code (including any code
280
transitively called from it) must not make any assumptions about the distribution of
281
iterations to tasks or the worker thread in which they are executed. The loop body for each
282
iteration must be able to make forward progress independent of other iterations and be free
283
from data races. As such, invalid synchronizations across iterations may deadlock while
284
unsynchronized memory accesses may result in undefined behavior.
285

286
For example, the above conditions imply that:
287

288
- A lock taken in an iteration *must* be released within the same iteration.
289
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
290
- Write only to locations not shared across iterations (unless a lock or atomic operation is
291
  used).
292
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
293
  may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
294

295
## Schedulers
296

297
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
298
releases. Currently, `:dynamic` is used when the scheduler is not specified.
299

300
!!! compat "Julia 1.5"
301
    The `schedule` argument is available as of Julia 1.5.
302

303
### `:dynamic` (default)
304

305
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
306
implementation assumes that the workload for each iteration is uniform. However, this
307
assumption may be removed in the future.
308

309
This scheduling option is merely a hint to the underlying execution mechanism. However, a
310
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
311
bounded by a small constant multiple of the number of available worker threads
312
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
313
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
314
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
315
larger than the number of the worker threads and the run-time of `f(x)` is relatively
316
smaller than the cost of spawning and synchronizing a task (typically less than 10
317
microseconds).
318

319
!!! compat "Julia 1.8"
320
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
321

322
### `:greedy`
323

324
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
325
the given iterated values as they are produced. As soon as one task finishes its work, it takes
326
the next value from the iterator. Work done by any individual task is not necessarily on
327
contiguous values from the iterator. The given iterator may produce values forever, only the
328
iterator interface is required (no indexing).
329

330
This scheduling option is generally a good choice if the workload of individual iterations
331
is not uniform/has a large spread.
332

333
!!! compat "Julia 1.11"
334
    The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
335

336
### `:static`
337

338
`:static` scheduler creates one task per thread and divides the iterations equally among
339
them, assigning each task specifically to each thread. In particular, the value of
340
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
341
Specifying `:static` is an error if used from inside another `@threads` loop or from a
342
thread other than 1.
343

344
!!! note
345
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
346
    In newly written library functions, `:static` scheduling is discouraged because the
347
    functions using this option cannot be called from arbitrary worker threads.
348

349
## Examples
350

351
To illustrate of the different scheduling strategies, consider the following function
352
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
353

354
```julia-repl
355
julia> function busywait(seconds)
356
            tstart = time_ns()
357
            while (time_ns() - tstart) / 1e9 < seconds
358
            end
359
        end
360

361
julia> @time begin
362
            Threads.@spawn busywait(5)
363
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
364
                busywait(1)
365
            end
366
        end
367
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
368

369
julia> @time begin
370
            Threads.@spawn busywait(5)
371
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
372
                busywait(1)
373
            end
374
        end
375
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
376
```
377

378
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
379
to run two of the 1-second iterations to complete the for loop.
380
"""
381
macro threads(args...)
382
    na = length(args)
383
    if na == 2
384
        sched, ex = args
385
        if sched isa QuoteNode
386
            sched = sched.value
387
        elseif sched isa Symbol
388
            # for now only allow quoted symbols
389
            sched = nothing
390
        end
391
        if sched !== :static && sched !== :dynamic && sched !== :greedy
392
            throw(ArgumentError("unsupported schedule argument in @threads"))
393
        end
394
    elseif na == 1
395
        sched = :default
396
        ex = args[1]
397
    else
398
        throw(ArgumentError("wrong number of arguments in @threads"))
399
    end
400
    if !(isa(ex, Expr) && ex.head === :for)
401
        throw(ArgumentError("@threads requires a `for` loop expression"))
402
    end
403
    if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
404
        throw(ArgumentError("nested outer loops are not currently supported by @threads"))
405
    end
406
    return _threadsfor(ex.args[1], ex.args[2], sched)
407
end
408

409
function _spawn_set_thrpool(t::Task, tp::Symbol)
2✔
410
    tpid = _sym_to_tpid(tp)
2,078✔
411
    if tpid == -1 || _nthreads_in_pool(tpid) == 0
2,849✔
412
        tpid = _sym_to_tpid(:default)
×
413
    end
414
    _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
2,846✔
415
    @assert _result == 1
2,846✔
416
    nothing
2,071✔
417
end
418

419
"""
420
    Threads.@spawn [:default|:interactive] expr
421

422
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
423
thread in the specified threadpool (`:default` if unspecified). The task is
424
allocated to a thread once one becomes available. To wait for the task to
425
finish, call [`wait`](@ref) on the result of this macro, or call
426
[`fetch`](@ref) to wait and then obtain its return value.
427

428
Values can be interpolated into `@spawn` via `\$`, which copies the value
429
directly into the constructed underlying closure. This allows you to insert
430
the _value_ of a variable, isolating the asynchronous code from changes to
431
the variable's value in the current task.
432

433
!!! note
434
    The thread that the task runs on may change if the task yields, therefore `threadid()` should not
435
    be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
436
    [multi-threading](@ref man-multithreading) manual for further important caveats.
437
    See also the chapter on [threadpools](@ref man-threadpools).
438

439
!!! compat "Julia 1.3"
440
    This macro is available as of Julia 1.3.
441

442
!!! compat "Julia 1.4"
443
    Interpolating values via `\$` is available as of Julia 1.4.
444

445
!!! compat "Julia 1.9"
446
    A threadpool may be specified as of Julia 1.9.
447

448
# Examples
449
```julia-repl
450
julia> t() = println("Hello from ", Threads.threadid());
451

452
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
453
Hello from 1
454
Hello from 1
455
Hello from 3
456
Hello from 4
457
```
458
"""
459
macro spawn(args...)
12✔
460
    tp = QuoteNode(:default)
12✔
461
    na = length(args)
12✔
462
    if na == 2
12✔
463
        ttype, ex = args
×
464
        if ttype isa QuoteNode
×
465
            ttype = ttype.value
×
466
            if ttype !== :interactive && ttype !== :default
×
467
                throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
×
468
            end
469
            tp = QuoteNode(ttype)
×
470
        else
471
            tp = ttype
×
472
        end
473
    elseif na == 1
12✔
474
        ex = args[1]
12✔
475
    else
476
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
477
    end
478

479
    letargs = Base._lift_one_interp!(ex)
12✔
480

481
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
12✔
482
    var = esc(Base.sync_varname)
12✔
483
    quote
12✔
484
        let $(letargs...)
485
            local task = Task($thunk)
2,841✔
486
            task.sticky = false
2,841✔
487
            _spawn_set_thrpool(task, $(esc(tp)))
2,841✔
488
            if $(Expr(:islocal, var))
2,069✔
489
                put!($var, task)
2,059✔
490
            end
491
            schedule(task)
2,841✔
492
            task
2,072✔
493
        end
494
    end
495
end
496

497
# This is a stub that can be overloaded for downstream structures like `Channel`
498
function foreach end
499

500
# Scheduling traits that can be employed for downstream overloads
501
abstract type AbstractSchedule end
502
struct StaticSchedule <: AbstractSchedule end
503
struct FairSchedule <: AbstractSchedule end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc