• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37505

pending completion
#37505

push

local

web-flow
Rename `ipython_mode` to `numbered_prompt` (#49314)


Co-authored-by: Jeff Bezanson <jeff.bezanson@gmail.com>
Co-authored-by: Kristoffer Carlsson <kcarlsson89@gmail.com>

1 of 1 new or added line in 1 file covered. (100.0%)

70164 of 83158 relevant lines covered (84.37%)

31979364.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.97
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
"""
7
    Threads.threadid() -> Int
8

9
Get the ID number of the current thread of execution. The master thread has
10
ID `1`.
11
"""
12
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
17,817,177✔
13

14
# lower bound on the largest threadid()
15
"""
16
    Threads.maxthreadid() -> Int
17

18
Get a lower bound on the number of threads (across all thread pools) available
19
to the Julia process, with atomic-acquire semantics. The result will always be
20
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
21
any task you were able to observe before calling `maxthreadid`.
22
"""
23
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
20✔
24

25
"""
26
    Threads.nthreads(:default | :interactive) -> Int
27

28
Get the current number of threads within the specified thread pool. The threads in default
29
have id numbers `1:nthreads(:default)`.
30

31
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
32
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
33
standard library and [`Threads.maxthreadid()`](@ref).
34
"""
35
nthreads(pool::Symbol) = threadpoolsize(pool)
×
36

37
function _nthreads_in_pool(tpid::Int8)
4,370,015✔
38
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
13,689,438✔
39
    return Int(unsafe_load(p, tpid + 1))
13,689,676✔
40
end
41

42
function _tpid_to_sym(tpid::Int8)
17✔
43
    return tpid == 0 ? :interactive : :default
9,324,330✔
44
end
45

46
function _sym_to_tpid(tp::Symbol)
371✔
47
    return tp === :interactive ? Int8(0) : Int8(1)
9,325,116✔
48
end
49

50
"""
51
    Threads.threadpool(tid = threadid()) -> Symbol
52

53
Returns the specified thread's threadpool; either `:default` or `:interactive`.
54
"""
55
function threadpool(tid = threadid())
9✔
56
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
11✔
57
    return _tpid_to_sym(tpid)
7✔
58
end
59

60
"""
61
    Threads.nthreadpools() -> Int
62

63
Returns the number of threadpools currently configured.
64
"""
65
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
1✔
66

67
"""
68
    Threads.threadpoolsize(pool::Symbol = :default) -> Int
69

70
Get the number of threads available to the default thread pool (or to the
71
specified thread pool).
72

73
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
74
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
75
[`Distributed`](@ref man-distributed) standard library.
76
"""
77
function threadpoolsize(pool::Symbol = :default)
3,743,889✔
78
    if pool === :default || pool === :interactive
8,403,504✔
79
        tpid = _sym_to_tpid(pool)
11,413,086✔
80
    else
81
        error("invalid threadpool specified")
×
82
    end
83
    return _nthreads_in_pool(tpid)
8,838,747✔
84
end
85

86
"""
87
    threadpooltids(pool::Symbol)
88

89
Returns a vector of IDs of threads in the given pool.
90
"""
91
function threadpooltids(pool::Symbol)
2✔
92
    ni = _nthreads_in_pool(Int8(0))
2✔
93
    if pool === :interactive
2✔
94
        return collect(1:ni)
1✔
95
    elseif pool === :default
1✔
96
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
1✔
97
    else
98
        error("invalid threadpool specified")
×
99
    end
100
end
101

102
function threading_run(fun, static)
440,619✔
103
    ccall(:jl_enter_threaded_region, Cvoid, ())
440,619✔
104
    n = threadpoolsize()
440,619✔
105
    tid_offset = threadpoolsize(:interactive)
440,619✔
106
    tasks = Vector{Task}(undef, n)
440,619✔
107
    for i = 1:n
881,237✔
108
        t = Task(() -> fun(i)) # pass in tid
2,421,625✔
109
        t.sticky = static
1,211,714✔
110
        static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
1,211,711✔
111
        tasks[i] = t
1,211,711✔
112
        schedule(t)
1,211,711✔
113
    end
1,982,801✔
114
    for i = 1:n
881,238✔
115
        Base._wait(tasks[i])
1,211,712✔
116
    end
1,982,803✔
117
    ccall(:jl_exit_threaded_region, Cvoid, ())
440,619✔
118
    failed_tasks = filter(istaskfailed, tasks)
440,619✔
119
    if !isempty(failed_tasks)
440,619✔
120
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
12✔
121
    end
122
end
123

124
function _threadsfor(iter, lbody, schedule)
125✔
125
    lidx = iter.args[1]         # index
125✔
126
    range = iter.args[2]
125✔
127
    quote
125✔
128
        local threadsfor_fun
5✔
129
        let range = $(esc(range))
440,623✔
130
        function threadsfor_fun(tid = 1; onethread = false)
2,863,527✔
131
            r = range # Load into local variable
1,211,558✔
132
            lenr = length(r)
1,211,558✔
133
            # divide loop iterations among threads
134
            if onethread
1,211,000✔
135
                tid = 1
×
136
                len, rem = lenr, 0
×
137
            else
138
                len, rem = divrem(lenr, threadpoolsize())
1,210,909✔
139
            end
140
            # not enough iterations for all the threads?
141
            if len == 0
1,211,024✔
142
                if tid > rem
67✔
143
                    return
40✔
144
                end
145
                len, rem = 1, 0
27✔
146
            end
147
            # compute this thread's iterations
148
            f = firstindex(r) + ((tid-1) * len)
1,210,945✔
149
            l = f + len - 1
1,210,368✔
150
            # distribute remaining iterations evenly
151
            if rem > 0
1,210,158✔
152
                if tid <= rem
46✔
153
                    f = f + (tid-1)
17✔
154
                    l = l + tid
17✔
155
                else
156
                    f = f + rem
29✔
157
                    l = l + rem
29✔
158
                end
159
            end
160
            # run this thread's iterations
161
            for i = f:l
2,419,999✔
162
                local $(esc(lidx)) = @inbounds r[i]
399,462,664✔
163
                $(esc(lbody))
561,060✔
164
            end
398,929,074✔
165
        end
166
        end
167
        if $(schedule === :dynamic || schedule === :default)
440,623✔
168
            threading_run(threadsfor_fun, false)
440,595✔
169
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
28✔
170
            error("`@threads :static` cannot be used concurrently or nested")
4✔
171
        else # :static
172
            threading_run(threadsfor_fun, true)
24✔
173
        end
174
        nothing
440,607✔
175
    end
176
end
177

178
"""
179
    Threads.@threads [schedule] for ... end
180

181
A macro to execute a `for` loop in parallel. The iteration space is distributed to
182
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
183
execution of the loop waits for the evaluation of all iterations.
184

185
See also: [`@spawn`](@ref Threads.@spawn) and
186
`pmap` in [`Distributed`](@ref man-distributed).
187

188
# Extended help
189

190
## Semantics
191

192
Unless stronger guarantees are specified by the scheduling option, the loop executed by
193
`@threads` macro have the following semantics.
194

195
The `@threads` macro executes the loop body in an unspecified order and potentially
196
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
197
The assignments can be different for each execution. The loop body code (including any code
198
transitively called from it) must not make any assumptions about the distribution of
199
iterations to tasks or the worker thread in which they are executed. The loop body for each
200
iteration must be able to make forward progress independent of other iterations and be free
201
from data races. As such, invalid synchronizations across iterations may deadlock while
202
unsynchronized memory accesses may result in undefined behavior.
203

204
For example, the above conditions imply that:
205

206
- A lock taken in an iteration *must* be released within the same iteration.
207
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
208
- Write only to locations not shared across iterations (unless a lock or atomic operation is
209
  used).
210
- The value of [`threadid()`](@ref Threads.threadid) may change even within a single
211
  iteration.
212

213
## Schedulers
214

215
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
216
releases. Currently, `:dynamic` is used when the scheduler is not specified.
217

218
!!! compat "Julia 1.5"
219
    The `schedule` argument is available as of Julia 1.5.
220

221
### `:dynamic` (default)
222

223
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
224
implementation assumes that the workload for each iteration is uniform. However, this
225
assumption may be removed in the future.
226

227
This scheduling option is merely a hint to the underlying execution mechanism. However, a
228
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
229
bounded by a small constant multiple of the number of available worker threads
230
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
231
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
232
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
233
larger than the number of the worker threads and the run-time of `f(x)` is relatively
234
smaller than the cost of spawning and synchronizing a task (typically less than 10
235
microseconds).
236

237
!!! compat "Julia 1.8"
238
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
239

240
### `:static`
241

242
`:static` scheduler creates one task per thread and divides the iterations equally among
243
them, assigning each task specifically to each thread. In particular, the value of
244
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
245
Specifying `:static` is an error if used from inside another `@threads` loop or from a
246
thread other than 1.
247

248
!!! note
249
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
250
    In newly written library functions, `:static` scheduling is discouraged because the
251
    functions using this option cannot be called from arbitrary worker threads.
252

253
## Example
254

255
To illustrate of the different scheduling strategies, consider the following function
256
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
257

258
```julia-repl
259
julia> function busywait(seconds)
260
            tstart = time_ns()
261
            while (time_ns() - tstart) / 1e9 < seconds
262
            end
263
        end
264

265
julia> @time begin
266
            Threads.@spawn busywait(5)
267
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
268
                busywait(1)
269
            end
270
        end
271
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
272

273
julia> @time begin
274
            Threads.@spawn busywait(5)
275
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
276
                busywait(1)
277
            end
278
        end
279
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
280
```
281

282
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
283
to run two of the 1-second iterations to complete the for loop.
284
"""
285
macro threads(args...)
132✔
286
    na = length(args)
132✔
287
    if na == 2
132✔
288
        sched, ex = args
37✔
289
        if sched isa QuoteNode
37✔
290
            sched = sched.value
36✔
291
        elseif sched isa Symbol
1✔
292
            # for now only allow quoted symbols
293
            sched = nothing
×
294
        end
295
        if sched !== :static && sched !== :dynamic
37✔
296
            throw(ArgumentError("unsupported schedule argument in @threads"))
1✔
297
        end
298
    elseif na == 1
95✔
299
        sched = :default
95✔
300
        ex = args[1]
95✔
301
    else
302
        throw(ArgumentError("wrong number of arguments in @threads"))
×
303
    end
304
    if !(isa(ex, Expr) && ex.head === :for)
132✔
305
        throw(ArgumentError("@threads requires a `for` loop expression"))
2✔
306
    end
307
    if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
129✔
308
        throw(ArgumentError("nested outer loops are not currently supported by @threads"))
4✔
309
    end
310
    return _threadsfor(ex.args[1], ex.args[2], sched)
125✔
311
end
312

313
function _spawn_set_thrpool(t::Task, tp::Symbol)
2,277,703✔
314
    tpid = _sym_to_tpid(tp)
2,277,790✔
315
    if _nthreads_in_pool(tpid) == 0
2,278,945✔
316
        tpid = _sym_to_tpid(:default)
×
317
    end
318
    ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
2,279,385✔
319
    nothing
2,279,298✔
320
end
321

322
"""
323
    Threads.@spawn [:default|:interactive] expr
324

325
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
326
thread in the specified threadpool (`:default` if unspecified). The task is
327
allocated to a thread once one becomes available. To wait for the task to
328
finish, call [`wait`](@ref) on the result of this macro, or call
329
[`fetch`](@ref) to wait and then obtain its return value.
330

331
Values can be interpolated into `@spawn` via `\$`, which copies the value
332
directly into the constructed underlying closure. This allows you to insert
333
the _value_ of a variable, isolating the asynchronous code from changes to
334
the variable's value in the current task.
335

336
!!! note
337
    See the manual chapter on [multi-threading](@ref man-multithreading)
338
    for important caveats. See also the chapter on [threadpools](@ref man-threadpools).
339

340
!!! compat "Julia 1.3"
341
    This macro is available as of Julia 1.3.
342

343
!!! compat "Julia 1.4"
344
    Interpolating values via `\$` is available as of Julia 1.4.
345

346
!!! compat "Julia 1.9"
347
    A threadpool may be specified as of Julia 1.9.
348
"""
349
macro spawn(args...)
108✔
350
    tp = :default
108✔
351
    na = length(args)
108✔
352
    if na == 2
108✔
353
        ttype, ex = args
2✔
354
        if ttype isa QuoteNode
2✔
355
            ttype = ttype.value
2✔
356
        elseif ttype isa Symbol
×
357
            # TODO: allow unquoted symbols
358
            ttype = nothing
×
359
        end
360
        if ttype === :interactive || ttype === :default
3✔
361
            tp = ttype
2✔
362
        else
363
            throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
2✔
364
        end
365
    elseif na == 1
106✔
366
        ex = args[1]
106✔
367
    else
368
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
369
    end
370

371
    letargs = Base._lift_one_interp!(ex)
108✔
372

373
    thunk = esc(:(()->($ex)))
2,277,660✔
374
    var = esc(Base.sync_varname)
108✔
375
    quote
108✔
376
        let $(letargs...)
413✔
377
            local task = Task($thunk)
2,279,051✔
378
            task.sticky = false
2,279,618✔
379
            _spawn_set_thrpool(task, $(QuoteNode(tp)))
2,278,826✔
380
            if $(Expr(:islocal, var))
2,278,083✔
381
                put!($var, task)
1,000,974✔
382
            end
383
            schedule(task)
2,278,275✔
384
            task
2,276,844✔
385
        end
386
    end
387
end
388

389
# This is a stub that can be overloaded for downstream structures like `Channel`
390
function foreach end
391

392
# Scheduling traits that can be employed for downstream overloads
393
abstract type AbstractSchedule end
394
struct StaticSchedule <: AbstractSchedule end
8✔
395
struct FairSchedule <: AbstractSchedule end
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc