• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37602

21 Aug 2023 07:21PM UTC coverage: 86.402% (-0.1%) from 86.512%
#37602

push

local

web-flow
Add examples of simple use to `Ref` docs (#50995)

The docs didn't document the simple getting and setting behaviour of
`Ref`s.

73366 of 84912 relevant lines covered (86.4%)

12351763.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.67
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
"""
7
    Threads.threadid() -> Int
8

9
Get the ID number of the current thread of execution. The master thread has
10
ID `1`.
11

12
# Examples
13
```julia-repl
14
julia> Threads.threadid()
15
1
16

17
julia> Threads.@threads for i in 1:4
18
          println(Threads.threadid())
19
       end
20
4
21
2
22
5
23
4
24
```
25

26
!!! note
27
    The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
28
    For this reason in most cases it is not safe to use `threadid()` to index into, say, a vector of buffer or stateful objects.
29

30
"""
31
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
24,697,805✔
32

33
# lower bound on the largest threadid()
34
"""
35
    Threads.maxthreadid() -> Int
36

37
Get a lower bound on the number of threads (across all thread pools) available
38
to the Julia process, with atomic-acquire semantics. The result will always be
39
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
40
any task you were able to observe before calling `maxthreadid`.
41
"""
42
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
102✔
43

44
"""
45
    Threads.nthreads(:default | :interactive) -> Int
46

47
Get the current number of threads within the specified thread pool. The threads in default
48
have id numbers `1:nthreads(:default)`.
49

50
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
51
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
52
standard library and [`Threads.maxthreadid()`](@ref).
53
"""
54
nthreads(pool::Symbol) = threadpoolsize(pool)
×
55

56
function _nthreads_in_pool(tpid::Int8)
2,092,316✔
57
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
25,832,971✔
58
    return Int(unsafe_load(p, tpid + 1))
25,834,322✔
59
end
60

61
function _tpid_to_sym(tpid::Int8)
7✔
62
    if tpid == 0
8,735,667✔
63
        return :interactive
13✔
64
    elseif tpid == 1
8,736,285✔
65
        return :default
8,736,438✔
66
    elseif tpid == -1
×
67
        return :foreign
×
68
    else
69
        throw(ArgumentError("Unrecognized threadpool id $tpid"))
×
70
    end
71
end
72

73
function _sym_to_tpid(tp::Symbol)
3,302,204✔
74
    if tp === :interactive
18,314,935✔
75
        return Int8(0)
440,642✔
76
    elseif tp === :default
17,874,890✔
77
        return Int8(1)
17,874,766✔
78
    elseif tp == :foreign
2✔
79
        return Int8(-1)
×
80
    else
81
        throw(ArgumentError("Unrecognized threadpool name `$(repr(tp))`"))
2✔
82
    end
83
end
84

85
"""
86
    Threads.threadpool(tid = threadid()) -> Symbol
87

88
Returns the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
89
"""
90
function threadpool(tid = threadid())
13✔
91
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
16✔
92
    return _tpid_to_sym(tpid)
10✔
93
end
94

95
"""
96
    Threads.nthreadpools() -> Int
97

98
Returns the number of threadpools currently configured.
99
"""
100
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
1✔
101

102
"""
103
    Threads.threadpoolsize(pool::Symbol = :default) -> Int
104

105
Get the number of threads available to the default thread pool (or to the
106
specified thread pool).
107

108
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
109
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
110
[`Distributed`](@ref man-distributed) standard library.
111
"""
112
function threadpoolsize(pool::Symbol = :default)
3,743,718✔
113
    if pool === :default || pool === :interactive
12,476,215✔
114
        tpid = _sym_to_tpid(pool)
19,559,907✔
115
    elseif pool == :foreign
×
116
        error("Threadpool size of `:foreign` is indeterminant")
×
117
    else
118
        error("invalid threadpool specified")
×
119
    end
120
    return _nthreads_in_pool(tpid)
13,914,717✔
121
end
122

123
"""
124
    threadpooltids(pool::Symbol)
125

126
Returns a vector of IDs of threads in the given pool.
127
"""
128
function threadpooltids(pool::Symbol)
2✔
129
    ni = _nthreads_in_pool(Int8(0))
2✔
130
    if pool === :interactive
2✔
131
        return collect(1:ni)
1✔
132
    elseif pool === :default
1✔
133
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
1✔
134
    else
135
        error("invalid threadpool specified")
×
136
    end
137
end
138

139
"""
140
    Threads.ngcthreads() -> Int
141

142
Returns the number of GC threads currently configured.
143
This includes both mark threads and concurrent sweep threads.
144
"""
145
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
10✔
146

147
function threading_run(fun, static)
440,632✔
148
    ccall(:jl_enter_threaded_region, Cvoid, ())
440,632✔
149
    n = threadpoolsize()
440,632✔
150
    tid_offset = threadpoolsize(:interactive)
440,632✔
151
    tasks = Vector{Task}(undef, n)
440,632✔
152
    for i = 1:n
881,263✔
153
        t = Task(() -> fun(i)) # pass in tid
2,421,915✔
154
        t.sticky = static
1,211,743✔
155
        if static
1,211,741✔
156
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
66✔
157
        else
158
            # TODO: this should be the current pool (except interactive) if there
159
            # are ever more than two pools.
160
            @assert ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default)) == 1
1,211,675✔
161
        end
162
        tasks[i] = t
1,211,742✔
163
        schedule(t)
1,211,742✔
164
    end
1,982,845✔
165
    for i = 1:n
881,264✔
166
        Base._wait(tasks[i])
1,211,743✔
167
    end
1,982,853✔
168
    ccall(:jl_exit_threaded_region, Cvoid, ())
440,632✔
169
    failed_tasks = filter!(istaskfailed, tasks)
440,632✔
170
    if !isempty(failed_tasks)
440,632✔
171
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
12✔
172
    end
173
end
174

175
function _threadsfor(iter, lbody, schedule)
138✔
176
    lidx = iter.args[1]         # index
138✔
177
    range = iter.args[2]
138✔
178
    quote
138✔
179
        local threadsfor_fun
×
180
        let range = $(esc(range))
440,641✔
181
        function threadsfor_fun(tid = 1; onethread = false)
2,863,308✔
182
            r = range # Load into local variable
1,211,437✔
183
            lenr = length(r)
1,211,446✔
184
            # divide loop iterations among threads
185
            if onethread
1,210,520✔
186
                tid = 1
×
187
                len, rem = lenr, 0
×
188
            else
189
                len, rem = divrem(lenr, threadpoolsize())
1,210,755✔
190
            end
191
            # not enough iterations for all the threads?
192
            if len == 0
1,210,921✔
193
                if tid > rem
67✔
194
                    return
40✔
195
                end
196
                len, rem = 1, 0
27✔
197
            end
198
            # compute this thread's iterations
199
            f = firstindex(r) + ((tid-1) * len)
1,210,920✔
200
            l = f + len - 1
1,210,334✔
201
            # distribute remaining iterations evenly
202
            if rem > 0
1,210,112✔
203
                if tid <= rem
58✔
204
                    f = f + (tid-1)
25✔
205
                    l = l + tid
25✔
206
                else
207
                    f = f + rem
33✔
208
                    l = l + rem
33✔
209
                end
210
            end
211
            # run this thread's iterations
212
            for i = f:l
2,419,053✔
213
                local $(esc(lidx)) = @inbounds r[i]
433,854,394✔
214
                $(esc(lbody))
1,086,025✔
215
            end
434,758,155✔
216
        end
217
        end
218
        if $(schedule === :dynamic || schedule === :default)
440,635✔
219
            threading_run(threadsfor_fun, false)
440,608✔
220
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
28✔
221
            error("`@threads :static` cannot be used concurrently or nested")
4✔
222
        else # :static
223
            threading_run(threadsfor_fun, true)
24✔
224
        end
225
        nothing
440,619✔
226
    end
227
end
228

229
"""
230
    Threads.@threads [schedule] for ... end
231

232
A macro to execute a `for` loop in parallel. The iteration space is distributed to
233
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
234
execution of the loop waits for the evaluation of all iterations.
235

236
See also: [`@spawn`](@ref Threads.@spawn) and
237
`pmap` in [`Distributed`](@ref man-distributed).
238

239
# Extended help
240

241
## Semantics
242

243
Unless stronger guarantees are specified by the scheduling option, the loop executed by
244
`@threads` macro have the following semantics.
245

246
The `@threads` macro executes the loop body in an unspecified order and potentially
247
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
248
The assignments can be different for each execution. The loop body code (including any code
249
transitively called from it) must not make any assumptions about the distribution of
250
iterations to tasks or the worker thread in which they are executed. The loop body for each
251
iteration must be able to make forward progress independent of other iterations and be free
252
from data races. As such, invalid synchronizations across iterations may deadlock while
253
unsynchronized memory accesses may result in undefined behavior.
254

255
For example, the above conditions imply that:
256

257
- A lock taken in an iteration *must* be released within the same iteration.
258
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
259
- Write only to locations not shared across iterations (unless a lock or atomic operation is
260
  used).
261
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
262
  may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
263

264
## Schedulers
265

266
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
267
releases. Currently, `:dynamic` is used when the scheduler is not specified.
268

269
!!! compat "Julia 1.5"
270
    The `schedule` argument is available as of Julia 1.5.
271

272
### `:dynamic` (default)
273

274
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
275
implementation assumes that the workload for each iteration is uniform. However, this
276
assumption may be removed in the future.
277

278
This scheduling option is merely a hint to the underlying execution mechanism. However, a
279
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
280
bounded by a small constant multiple of the number of available worker threads
281
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
282
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
283
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
284
larger than the number of the worker threads and the run-time of `f(x)` is relatively
285
smaller than the cost of spawning and synchronizing a task (typically less than 10
286
microseconds).
287

288
!!! compat "Julia 1.8"
289
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
290

291
### `:static`
292

293
`:static` scheduler creates one task per thread and divides the iterations equally among
294
them, assigning each task specifically to each thread. In particular, the value of
295
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
296
Specifying `:static` is an error if used from inside another `@threads` loop or from a
297
thread other than 1.
298

299
!!! note
300
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
301
    In newly written library functions, `:static` scheduling is discouraged because the
302
    functions using this option cannot be called from arbitrary worker threads.
303

304
## Example
305

306
To illustrate of the different scheduling strategies, consider the following function
307
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
308

309
```julia-repl
310
julia> function busywait(seconds)
311
            tstart = time_ns()
312
            while (time_ns() - tstart) / 1e9 < seconds
313
            end
314
        end
315

316
julia> @time begin
317
            Threads.@spawn busywait(5)
318
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
319
                busywait(1)
320
            end
321
        end
322
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
323

324
julia> @time begin
325
            Threads.@spawn busywait(5)
326
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
327
                busywait(1)
328
            end
329
        end
330
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
331
```
332

333
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
334
to run two of the 1-second iterations to complete the for loop.
335
"""
336
macro threads(args...)
145✔
337
    na = length(args)
145✔
338
    if na == 2
145✔
339
        sched, ex = args
37✔
340
        if sched isa QuoteNode
37✔
341
            sched = sched.value
36✔
342
        elseif sched isa Symbol
1✔
343
            # for now only allow quoted symbols
344
            sched = nothing
×
345
        end
346
        if sched !== :static && sched !== :dynamic
37✔
347
            throw(ArgumentError("unsupported schedule argument in @threads"))
1✔
348
        end
349
    elseif na == 1
108✔
350
        sched = :default
108✔
351
        ex = args[1]
108✔
352
    else
353
        throw(ArgumentError("wrong number of arguments in @threads"))
×
354
    end
355
    if !(isa(ex, Expr) && ex.head === :for)
145✔
356
        throw(ArgumentError("@threads requires a `for` loop expression"))
2✔
357
    end
358
    if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
142✔
359
        throw(ArgumentError("nested outer loops are not currently supported by @threads"))
4✔
360
    end
361
    return _threadsfor(ex.args[1], ex.args[2], sched)
138✔
362
end
363

364
function _spawn_set_thrpool(t::Task, tp::Symbol)
6,281,082✔
365
    tpid = _sym_to_tpid(tp)
12,561,728✔
366
    if tpid == -1 || _nthreads_in_pool(tpid) == 0
12,561,364✔
367
        tpid = _sym_to_tpid(:default)
×
368
    end
369
    @assert ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid) == 1
6,281,167✔
370
    nothing
6,281,069✔
371
end
372

373
"""
374
    Threads.@spawn [:default|:interactive] expr
375

376
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
377
thread in the specified threadpool (`:default` if unspecified). The task is
378
allocated to a thread once one becomes available. To wait for the task to
379
finish, call [`wait`](@ref) on the result of this macro, or call
380
[`fetch`](@ref) to wait and then obtain its return value.
381

382
Values can be interpolated into `@spawn` via `\$`, which copies the value
383
directly into the constructed underlying closure. This allows you to insert
384
the _value_ of a variable, isolating the asynchronous code from changes to
385
the variable's value in the current task.
386

387
!!! note
388
    The thread that the task runs on may change if the task yields, therefore `threadid()` should not
389
    be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
390
    [multi-threading](@ref man-multithreading) manual for further important caveats.
391
    See also the chapter on [threadpools](@ref man-threadpools).
392

393
!!! compat "Julia 1.3"
394
    This macro is available as of Julia 1.3.
395

396
!!! compat "Julia 1.4"
397
    Interpolating values via `\$` is available as of Julia 1.4.
398

399
!!! compat "Julia 1.9"
400
    A threadpool may be specified as of Julia 1.9.
401

402
# Examples
403
```julia-repl
404
julia> t() = println("Hello from ", Threads.threadid());
405

406
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
407
Hello from 1
408
Hello from 1
409
Hello from 3
410
Hello from 4
411
```
412
"""
413
macro spawn(args...)
117✔
414
    tp = QuoteNode(:default)
117✔
415
    na = length(args)
117✔
416
    if na == 2
117✔
417
        ttype, ex = args
5✔
418
        if ttype isa QuoteNode
5✔
419
            ttype = ttype.value
2✔
420
            if ttype !== :interactive && ttype !== :default
2✔
421
                throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
×
422
            end
423
            tp = QuoteNode(ttype)
2✔
424
        else
425
            tp = ttype
8✔
426
        end
427
    elseif na == 1
112✔
428
        ex = args[1]
112✔
429
    else
430
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
431
    end
432

433
    letargs = Base._lift_one_interp!(ex)
117✔
434

435
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
117✔
436
    var = esc(Base.sync_varname)
117✔
437
    quote
117✔
438
        let $(letargs...)
2,276✔
439
            local task = Task($thunk)
6,280,853✔
440
            task.sticky = false
6,281,418✔
441
            _spawn_set_thrpool(task, $(esc(tp)))
6,281,030✔
442
            if $(Expr(:islocal, var))
6,279,933✔
443
                put!($var, task)
5,000,909✔
444
            end
445
            schedule(task)
6,279,890✔
446
            task
6,278,472✔
447
        end
448
    end
449
end
450

451
# This is a stub that can be overloaded for downstream structures like `Channel`
452
function foreach end
453

454
# Scheduling traits that can be employed for downstream overloads
455
abstract type AbstractSchedule end
456
struct StaticSchedule <: AbstractSchedule end
8✔
457
struct FairSchedule <: AbstractSchedule end
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc