• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37550

pending completion
#37550

push

local

web-flow
Extend comparison lifting to `Core.ifelse` (#49882)

This change extends our existing transformation for:
   φ(a,b) === Const(c)   =>   φ(a === c, b === c)

to perform the analogous transformation for `Core.ifelse`:
   Core.ifelse(cond, a, b) === Const(c)
  =>
   Core.ifelse(cond, a === c, b === c)

89 of 89 new or added lines in 1 file covered. (100.0%)

72705 of 83986 relevant lines covered (86.57%)

20561515.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.11
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
"""
7
    Threads.threadid() -> Int
8

9
Get the ID number of the current thread of execution. The master thread has
10
ID `1`.
11

12
# Examples
13
```julia-repl
14
julia> Threads.threadid()
15
1
16

17
julia> Threads.@threads for i in 1:4
18
          println(Threads.threadid())
19
       end
20
4
21
2
22
5
23
4
24
```
25
"""
26
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
26,084,787✔
27

28
# lower bound on the largest threadid()
29
"""
30
    Threads.maxthreadid() -> Int
31

32
Get a lower bound on the number of threads (across all thread pools) available
33
to the Julia process, with atomic-acquire semantics. The result will always be
34
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
35
any task you were able to observe before calling `maxthreadid`.
36
"""
37
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
28✔
38

39
"""
40
    Threads.nthreads(:default | :interactive) -> Int
41

42
Get the current number of threads within the specified thread pool. The threads in default
43
have id numbers `1:nthreads(:default)`.
44

45
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
46
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
47
standard library and [`Threads.maxthreadid()`](@ref).
48
"""
49
nthreads(pool::Symbol) = threadpoolsize(pool)
×
50

51
function _nthreads_in_pool(tpid::Int8)
4,370,357✔
52
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
25,984,981✔
53
    return Int(unsafe_load(p, tpid + 1))
25,984,298✔
54
end
55

56
function _tpid_to_sym(tpid::Int8)
5✔
57
    return tpid == 0 ? :interactive : :default
17,620,543✔
58
end
59

60
function _sym_to_tpid(tp::Symbol)
385✔
61
    return tp === :interactive ? Int8(0) : Int8(1)
25,621,703✔
62
end
63

64
"""
65
    Threads.threadpool(tid = threadid()) -> Symbol
66

67
Returns the specified thread's threadpool; either `:default` or `:interactive`.
68
"""
69
function threadpool(tid = threadid())
9✔
70
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
11✔
71
    return _tpid_to_sym(tpid)
7✔
72
end
73

74
"""
75
    Threads.nthreadpools() -> Int
76

77
Returns the number of threadpools currently configured.
78
"""
79
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
1✔
80

81
"""
82
    Threads.threadpoolsize(pool::Symbol = :default) -> Int
83

84
Get the number of threads available to the default thread pool (or to the
85
specified thread pool).
86

87
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
88
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
89
[`Distributed`](@ref man-distributed) standard library.
90
"""
91
function threadpoolsize(pool::Symbol = :default)
3,744,235✔
92
    if pool === :default || pool === :interactive
12,553,338✔
93
        tpid = _sym_to_tpid(pool)
19,710,908✔
94
    else
95
        error("invalid threadpool specified")
×
96
    end
97
    return _nthreads_in_pool(tpid)
13,987,248✔
98
end
99

100
"""
101
    threadpooltids(pool::Symbol)
102

103
Returns a vector of IDs of threads in the given pool.
104
"""
105
function threadpooltids(pool::Symbol)
2✔
106
    ni = _nthreads_in_pool(Int8(0))
2✔
107
    if pool === :interactive
2✔
108
        return collect(1:ni)
1✔
109
    elseif pool === :default
1✔
110
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
1✔
111
    else
112
        error("invalid threadpool specified")
×
113
    end
114
end
115

116
"""
117
    Threads.ngcthreads() -> Int
118

119
Returns the number of GC threads currently configured.
120
"""
121
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
7✔
122

123
function threading_run(fun, static)
440,624✔
124
    ccall(:jl_enter_threaded_region, Cvoid, ())
440,624✔
125
    n = threadpoolsize()
440,625✔
126
    tid_offset = threadpoolsize(:interactive)
440,625✔
127
    tasks = Vector{Task}(undef, n)
440,625✔
128
    for i = 1:n
881,247✔
129
        t = Task(() -> fun(i)) # pass in tid
2,422,012✔
130
        t.sticky = static
1,211,728✔
131
        static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
1,211,727✔
132
        tasks[i] = t
1,211,726✔
133
        schedule(t)
1,211,728✔
134
    end
1,982,830✔
135
    for i = 1:n
881,250✔
136
        Base._wait(tasks[i])
1,211,727✔
137
    end
1,982,829✔
138
    ccall(:jl_exit_threaded_region, Cvoid, ())
440,625✔
139
    failed_tasks = filter(istaskfailed, tasks)
440,625✔
140
    if !isempty(failed_tasks)
440,625✔
141
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
12✔
142
    end
143
end
144

145
function _threadsfor(iter, lbody, schedule)
131✔
146
    lidx = iter.args[1]         # index
131✔
147
    range = iter.args[2]
131✔
148
    quote
131✔
149
        local threadsfor_fun
×
150
        let range = $(esc(range))
440,630✔
151
        function threadsfor_fun(tid = 1; onethread = false)
2,863,239✔
152
            r = range # Load into local variable
1,211,423✔
153
            lenr = length(r)
1,211,429✔
154
            # divide loop iterations among threads
155
            if onethread
1,210,984✔
156
                tid = 1
×
157
                len, rem = lenr, 0
×
158
            else
159
                len, rem = divrem(lenr, threadpoolsize())
1,211,000✔
160
            end
161
            # not enough iterations for all the threads?
162
            if len == 0
1,210,984✔
163
                if tid > rem
67✔
164
                    return
40✔
165
                end
166
                len, rem = 1, 0
27✔
167
            end
168
            # compute this thread's iterations
169
            f = firstindex(r) + ((tid-1) * len)
1,211,024✔
170
            l = f + len - 1
1,210,254✔
171
            # distribute remaining iterations evenly
172
            if rem > 0
1,209,959✔
173
                if tid <= rem
52✔
174
                    f = f + (tid-1)
21✔
175
                    l = l + tid
21✔
176
                else
177
                    f = f + rem
31✔
178
                    l = l + rem
31✔
179
                end
180
            end
181
            # run this thread's iterations
182
            for i = f:l
2,419,439✔
183
                local $(esc(lidx)) = @inbounds r[i]
423,465,181✔
184
                $(esc(lbody))
823,540✔
185
            end
422,589,290✔
186
        end
187
        end
188
        if $(schedule === :dynamic || schedule === :default)
440,627✔
189
            threading_run(threadsfor_fun, false)
440,600✔
190
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
28✔
191
            error("`@threads :static` cannot be used concurrently or nested")
4✔
192
        else # :static
193
            threading_run(threadsfor_fun, true)
24✔
194
        end
195
        nothing
440,612✔
196
    end
197
end
198

199
"""
200
    Threads.@threads [schedule] for ... end
201

202
A macro to execute a `for` loop in parallel. The iteration space is distributed to
203
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
204
execution of the loop waits for the evaluation of all iterations.
205

206
See also: [`@spawn`](@ref Threads.@spawn) and
207
`pmap` in [`Distributed`](@ref man-distributed).
208

209
# Extended help
210

211
## Semantics
212

213
Unless stronger guarantees are specified by the scheduling option, the loop executed by
214
`@threads` macro have the following semantics.
215

216
The `@threads` macro executes the loop body in an unspecified order and potentially
217
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
218
The assignments can be different for each execution. The loop body code (including any code
219
transitively called from it) must not make any assumptions about the distribution of
220
iterations to tasks or the worker thread in which they are executed. The loop body for each
221
iteration must be able to make forward progress independent of other iterations and be free
222
from data races. As such, invalid synchronizations across iterations may deadlock while
223
unsynchronized memory accesses may result in undefined behavior.
224

225
For example, the above conditions imply that:
226

227
- A lock taken in an iteration *must* be released within the same iteration.
228
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
229
- Write only to locations not shared across iterations (unless a lock or atomic operation is
230
  used).
231
- The value of [`threadid()`](@ref Threads.threadid) may change even within a single
232
  iteration.
233

234
## Schedulers
235

236
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
237
releases. Currently, `:dynamic` is used when the scheduler is not specified.
238

239
!!! compat "Julia 1.5"
240
    The `schedule` argument is available as of Julia 1.5.
241

242
### `:dynamic` (default)
243

244
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
245
implementation assumes that the workload for each iteration is uniform. However, this
246
assumption may be removed in the future.
247

248
This scheduling option is merely a hint to the underlying execution mechanism. However, a
249
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
250
bounded by a small constant multiple of the number of available worker threads
251
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
252
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
253
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
254
larger than the number of the worker threads and the run-time of `f(x)` is relatively
255
smaller than the cost of spawning and synchronizing a task (typically less than 10
256
microseconds).
257

258
!!! compat "Julia 1.8"
259
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
260

261
### `:static`
262

263
`:static` scheduler creates one task per thread and divides the iterations equally among
264
them, assigning each task specifically to each thread. In particular, the value of
265
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
266
Specifying `:static` is an error if used from inside another `@threads` loop or from a
267
thread other than 1.
268

269
!!! note
270
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
271
    In newly written library functions, `:static` scheduling is discouraged because the
272
    functions using this option cannot be called from arbitrary worker threads.
273

274
## Example
275

276
To illustrate of the different scheduling strategies, consider the following function
277
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
278

279
```julia-repl
280
julia> function busywait(seconds)
281
            tstart = time_ns()
282
            while (time_ns() - tstart) / 1e9 < seconds
283
            end
284
        end
285

286
julia> @time begin
287
            Threads.@spawn busywait(5)
288
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
289
                busywait(1)
290
            end
291
        end
292
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
293

294
julia> @time begin
295
            Threads.@spawn busywait(5)
296
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
297
                busywait(1)
298
            end
299
        end
300
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
301
```
302

303
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
304
to run two of the 1-second iterations to complete the for loop.
305
"""
306
macro threads(args...)
138✔
307
    na = length(args)
138✔
308
    if na == 2
138✔
309
        sched, ex = args
37✔
310
        if sched isa QuoteNode
37✔
311
            sched = sched.value
36✔
312
        elseif sched isa Symbol
1✔
313
            # for now only allow quoted symbols
314
            sched = nothing
×
315
        end
316
        if sched !== :static && sched !== :dynamic
37✔
317
            throw(ArgumentError("unsupported schedule argument in @threads"))
1✔
318
        end
319
    elseif na == 1
101✔
320
        sched = :default
101✔
321
        ex = args[1]
101✔
322
    else
323
        throw(ArgumentError("wrong number of arguments in @threads"))
×
324
    end
325
    if !(isa(ex, Expr) && ex.head === :for)
138✔
326
        throw(ArgumentError("@threads requires a `for` loop expression"))
2✔
327
    end
328
    if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
135✔
329
        throw(ArgumentError("nested outer loops are not currently supported by @threads"))
4✔
330
    end
331
    return _threadsfor(ex.args[1], ex.args[2], sched)
131✔
332
end
333

334
function _spawn_set_thrpool(t::Task, tp::Symbol)
6,277,751✔
335
    tpid = _sym_to_tpid(tp)
10,277,885✔
336
    if _nthreads_in_pool(tpid) == 0
6,279,082✔
337
        tpid = _sym_to_tpid(:default)
×
338
    end
339
    ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
6,279,314✔
340
    nothing
6,279,271✔
341
end
342

343
"""
344
    Threads.@spawn [:default|:interactive] expr
345

346
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
347
thread in the specified threadpool (`:default` if unspecified). The task is
348
allocated to a thread once one becomes available. To wait for the task to
349
finish, call [`wait`](@ref) on the result of this macro, or call
350
[`fetch`](@ref) to wait and then obtain its return value.
351

352
Values can be interpolated into `@spawn` via `\$`, which copies the value
353
directly into the constructed underlying closure. This allows you to insert
354
the _value_ of a variable, isolating the asynchronous code from changes to
355
the variable's value in the current task.
356

357
!!! note
358
    See the manual chapter on [multi-threading](@ref man-multithreading)
359
    for important caveats. See also the chapter on [threadpools](@ref man-threadpools).
360

361
!!! compat "Julia 1.3"
362
    This macro is available as of Julia 1.3.
363

364
!!! compat "Julia 1.4"
365
    Interpolating values via `\$` is available as of Julia 1.4.
366

367
!!! compat "Julia 1.9"
368
    A threadpool may be specified as of Julia 1.9.
369

370
# Examples
371
```julia-repl
372
julia> t() = println("Hello from ", Threads.threadid());
373

374
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
375
Hello from 1
376
Hello from 1
377
Hello from 3
378
Hello from 4
379
```
380
"""
381
macro spawn(args...)
112✔
382
    tp = :default
112✔
383
    na = length(args)
112✔
384
    if na == 2
112✔
385
        ttype, ex = args
2✔
386
        if ttype isa QuoteNode
2✔
387
            ttype = ttype.value
2✔
388
        elseif ttype isa Symbol
×
389
            # TODO: allow unquoted symbols
390
            ttype = nothing
×
391
        end
392
        if ttype === :interactive || ttype === :default
3✔
393
            tp = ttype
2✔
394
        else
395
            throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
2✔
396
        end
397
    elseif na == 1
110✔
398
        ex = args[1]
110✔
399
    else
400
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
401
    end
402

403
    letargs = Base._lift_one_interp!(ex)
112✔
404

405
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
112✔
406
    var = esc(Base.sync_varname)
112✔
407
    quote
112✔
408
        let $(letargs...)
413✔
409
            local task = Task($thunk)
6,279,048✔
410
            task.sticky = false
6,279,667✔
411
            _spawn_set_thrpool(task, $(QuoteNode(tp)))
6,278,975✔
412
            if $(Expr(:islocal, var))
6,278,015✔
413
                put!($var, task)
5,000,911✔
414
            end
415
            schedule(task)
6,278,349✔
416
            task
6,276,853✔
417
        end
418
    end
419
end
420

421
# This is a stub that can be overloaded for downstream structures like `Channel`
422
function foreach end
423

424
# Scheduling traits that can be employed for downstream overloads
425
abstract type AbstractSchedule end
426
struct StaticSchedule <: AbstractSchedule end
8✔
427
struct FairSchedule <: AbstractSchedule end
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc