• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37848

24 Jul 2024 06:36AM UTC coverage: 87.54% (+0.03%) from 87.511%
#37848

push

local

web-flow
inference: refine branched `Conditional` types (#55216)

Separated from JuliaLang/julia#40880.
This subtle adjustment allows for more accurate type inference in the
following kind of cases:
```julia
function condition_object_update2(x)
    cond = x isa Int
    if cond # `cond` is known to be `Const(true)` within this branch
        return !cond ? nothing : x # ::Int
    else
        return  cond ? nothing : 1 # ::Int
    end
end
@test Base.infer_return_type(condition_object_update2, (Any,)) == Int
```

Also cleans up typelattice.jl a bit.

30 of 31 new or added lines in 1 file covered. (96.77%)

57 existing lines in 4 files now uncovered.

77540 of 88577 relevant lines covered (87.54%)

16058405.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.9
/base/threadingconstructs.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
export threadid, nthreads, @threads, @spawn,
4
       threadpool, nthreadpools
5

6
"""
7
    Threads.threadid() -> Int
8

9
Get the ID number of the current thread of execution. The master thread has
10
ID `1`.
11

12
# Examples
13
```julia-repl
14
julia> Threads.threadid()
15
1
16

17
julia> Threads.@threads for i in 1:4
18
          println(Threads.threadid())
19
       end
20
4
21
2
22
5
23
4
24
```
25

26
!!! note
27
    The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
28
    For this reason in most cases it is not safe to use `threadid()` to index into, say, a vector of buffer or stateful objects.
29

30
"""
31
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
2,608,854✔
32

33
# lower bound on the largest threadid()
34
"""
35
    Threads.maxthreadid() -> Int
36

37
Get a lower bound on the number of threads (across all thread pools) available
38
to the Julia process, with atomic-acquire semantics. The result will always be
39
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
40
any task you were able to observe before calling `maxthreadid`.
41
"""
42
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
73✔
43

44
"""
45
    Threads.nthreads(:default | :interactive) -> Int
46

47
Get the current number of threads within the specified thread pool. The threads in `:interactive`
48
have id numbers `1:nthreads(:interactive)`, and the threads in `:default` have id numbers in
49
`nthreads(:interactive) .+ (1:nthreads(:default))`.
50

51
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
52
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
53
standard library and [`Threads.maxthreadid()`](@ref).
54
"""
55
nthreads(pool::Symbol) = threadpoolsize(pool)
12✔
56

57
function _nthreads_in_pool(tpid::Int8)
58
    p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
57,660✔
59
    return Int(unsafe_load(p, tpid + 1))
57,660✔
60
end
61

62
function _tpid_to_sym(tpid::Int8)
63
    if tpid == 0
27,244✔
64
        return :interactive
9✔
65
    elseif tpid == 1
27,235✔
66
        return :default
27,235✔
67
    elseif tpid == -1
×
68
        return :foreign
×
69
    else
70
        throw(ArgumentError(LazyString("Unrecognized threadpool id ", tpid)))
×
71
    end
72
end
73

74
function _sym_to_tpid(tp::Symbol)
761✔
75
    if tp === :interactive
27,975✔
76
        return Int8(0)
16✔
77
    elseif tp === :default
27,959✔
78
        return Int8(1)
27,958✔
79
    elseif tp == :foreign
1✔
80
        return Int8(-1)
×
81
    else
82
        throw(ArgumentError(LazyString("Unrecognized threadpool name `", tp, "`")))
1✔
83
    end
84
end
85

86
"""
87
    Threads.threadpool(tid = threadid()) -> Symbol
88

89
Returns the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
90
"""
91
function threadpool(tid = threadid())
92
    tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
30✔
93
    return _tpid_to_sym(tpid)
60✔
94
end
95

96
"""
97
    Threads.threadpooldescription(tid = threadid()) -> String
98

99
Returns the specified thread's threadpool name with extended description where appropriate.
100
"""
101
function threadpooldescription(tid = threadid())
30✔
102
    threadpool_name = threadpool(tid)
60✔
103
    if threadpool_name == :foreign
30✔
104
        # TODO: extend tls to include a field to add a description to a foreign thread and make this more general
UNCOV
105
        n_others = nthreads(:interactive) + nthreads(:default)
×
106
        # Assumes GC threads come first in the foreign thread pool
UNCOV
107
        if tid > n_others && tid <= n_others + ngcthreads()
×
UNCOV
108
            return "foreign: gc"
×
109
        end
110
    end
111
    return string(threadpool_name)
30✔
112
end
113

114
"""
115
    Threads.nthreadpools() -> Int
116

117
Returns the number of threadpools currently configured.
118
"""
119
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
×
120

121
"""
122
    Threads.threadpoolsize(pool::Symbol = :default) -> Int
123

124
Get the number of threads available to the default thread pool (or to the
125
specified thread pool).
126

127
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
128
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
129
[`Distributed`](@ref man-distributed) standard library.
130
"""
131
function threadpoolsize(pool::Symbol = :default)
132
    if pool === :default || pool === :interactive
27,296✔
133
        tpid = _sym_to_tpid(pool)
54,431✔
134
    elseif pool == :foreign
×
UNCOV
135
        error("Threadpool size of `:foreign` is indeterminant")
×
136
    else
UNCOV
137
        error("invalid threadpool specified")
×
138
    end
139
    return _nthreads_in_pool(tpid)
54,270✔
140
end
141

142
"""
143
    threadpooltids(pool::Symbol)
144

145
Returns a vector of IDs of threads in the given pool.
146
"""
UNCOV
147
function threadpooltids(pool::Symbol)
×
UNCOV
148
    ni = _nthreads_in_pool(Int8(0))
×
UNCOV
149
    if pool === :interactive
×
UNCOV
150
        return collect(1:ni)
×
UNCOV
151
    elseif pool === :default
×
UNCOV
152
        return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
×
153
    else
UNCOV
154
        error("invalid threadpool specified")
×
155
    end
156
end
157

158
"""
159
    Threads.ngcthreads() -> Int
160

161
Returns the number of GC threads currently configured.
162
This includes both mark threads and concurrent sweep threads.
163
"""
164
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
16✔
165

166
function threading_run(fun, static)
1✔
167
    ccall(:jl_enter_threaded_region, Cvoid, ())
1✔
168
    n = threadpoolsize()
1✔
169
    tid_offset = threadpoolsize(:interactive)
1✔
170
    tasks = Vector{Task}(undef, n)
2✔
171
    for i = 1:n
1✔
172
        t = Task(() -> fun(i)) # pass in tid
2✔
173
        t.sticky = static
1✔
174
        if static
1✔
UNCOV
175
            ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
×
176
        else
177
            # TODO: this should be the current pool (except interactive) if there
178
            # are ever more than two pools.
179
            _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default))
1✔
180
            @assert _result == 1
1✔
181
        end
182
        tasks[i] = t
1✔
183
        schedule(t)
1✔
184
    end
1✔
185
    for i = 1:n
1✔
186
        Base._wait(tasks[i])
1✔
187
    end
1✔
188
    ccall(:jl_exit_threaded_region, Cvoid, ())
1✔
189
    failed_tasks = filter!(istaskfailed, tasks)
1✔
190
    if !isempty(failed_tasks)
1✔
191
        throw(CompositeException(map(TaskFailedException, failed_tasks)))
×
192
    end
193
end
194

195
function _threadsfor(iter, lbody, schedule)
1✔
196
    lidx = iter.args[1]         # index
1✔
197
    range = iter.args[2]
1✔
198
    esc_range = esc(range)
1✔
199
    func = if schedule === :greedy
1✔
200
        greedy_func(esc_range, lidx, lbody)
×
201
    else
202
        default_func(esc_range, lidx, lbody)
2✔
203
    end
204
    quote
1✔
205
        local threadsfor_fun
206
        $func
207
        if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
1✔
208
            threading_run(threadsfor_fun, false)
1✔
209
        elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
×
210
            error("`@threads :static` cannot be used concurrently or nested")
×
211
        else # :static
UNCOV
212
            threading_run(threadsfor_fun, true)
×
213
        end
214
        nothing
1✔
215
    end
216
end
217

UNCOV
218
function greedy_func(itr, lidx, lbody)
×
UNCOV
219
    quote
×
UNCOV
220
        let c = Channel{eltype($itr)}(0,spawn=true) do ch
×
UNCOV
221
            for item in $itr
×
UNCOV
222
                put!(ch, item)
×
UNCOV
223
            end
×
224
        end
225
        function threadsfor_fun(tid)
×
226
            for item in c
×
UNCOV
227
                local $(esc(lidx)) = item
×
UNCOV
228
                $(esc(lbody))
×
UNCOV
229
            end
×
230
        end
231
        end
232
    end
233
end
234

235
function default_func(itr, lidx, lbody)
1✔
236
    quote
1✔
237
        let range = $itr
1✔
238
        function threadsfor_fun(tid = 1; onethread = false)
3✔
239
            r = range # Load into local variable
1✔
240
            lenr = length(r)
1✔
241
            # divide loop iterations among threads
242
            if onethread
1✔
243
                tid = 1
×
244
                len, rem = lenr, 0
×
245
            else
246
                len, rem = divrem(lenr, threadpoolsize())
1✔
247
            end
248
            # not enough iterations for all the threads?
249
            if len == 0
1✔
UNCOV
250
                if tid > rem
×
UNCOV
251
                    return
×
252
                end
UNCOV
253
                len, rem = 1, 0
×
254
            end
255
            # compute this thread's iterations
256
            f = firstindex(r) + ((tid-1) * len)
1✔
257
            l = f + len - 1
1✔
258
            # distribute remaining iterations evenly
259
            if rem > 0
1✔
UNCOV
260
                if tid <= rem
×
UNCOV
261
                    f = f + (tid-1)
×
UNCOV
262
                    l = l + tid
×
263
                else
UNCOV
264
                    f = f + rem
×
UNCOV
265
                    l = l + rem
×
266
                end
267
            end
268
            # run this thread's iterations
269
            for i = f:l
1✔
270
                local $(esc(lidx)) = @inbounds r[i]
30✔
271
                $(esc(lbody))
30✔
272
            end
30✔
273
        end
274
        end
275
    end
276
end
277

278
"""
279
    Threads.@threads [schedule] for ... end
280

281
A macro to execute a `for` loop in parallel. The iteration space is distributed to
282
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
283
execution of the loop waits for the evaluation of all iterations.
284

285
See also: [`@spawn`](@ref Threads.@spawn) and
286
`pmap` in [`Distributed`](@ref man-distributed).
287

288
# Extended help
289

290
## Semantics
291

292
Unless stronger guarantees are specified by the scheduling option, the loop executed by
293
`@threads` macro have the following semantics.
294

295
The `@threads` macro executes the loop body in an unspecified order and potentially
296
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
297
The assignments can be different for each execution. The loop body code (including any code
298
transitively called from it) must not make any assumptions about the distribution of
299
iterations to tasks or the worker thread in which they are executed. The loop body for each
300
iteration must be able to make forward progress independent of other iterations and be free
301
from data races. As such, invalid synchronizations across iterations may deadlock while
302
unsynchronized memory accesses may result in undefined behavior.
303

304
For example, the above conditions imply that:
305

306
- A lock taken in an iteration *must* be released within the same iteration.
307
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
308
- Write only to locations not shared across iterations (unless a lock or atomic operation is
309
  used).
310
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
311
  may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
312

313
## Schedulers
314

315
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
316
releases. Currently, `:dynamic` is used when the scheduler is not specified.
317

318
!!! compat "Julia 1.5"
319
    The `schedule` argument is available as of Julia 1.5.
320

321
### `:dynamic` (default)
322

323
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
324
implementation assumes that the workload for each iteration is uniform. However, this
325
assumption may be removed in the future.
326

327
This scheduling option is merely a hint to the underlying execution mechanism. However, a
328
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
329
bounded by a small constant multiple of the number of available worker threads
330
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
331
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
332
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
333
larger than the number of the worker threads and the run-time of `f(x)` is relatively
334
smaller than the cost of spawning and synchronizing a task (typically less than 10
335
microseconds).
336

337
!!! compat "Julia 1.8"
338
    The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
339

340
### `:greedy`
341

342
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
343
the given iterated values as they are produced. As soon as one task finishes its work, it takes
344
the next value from the iterator. Work done by any individual task is not necessarily on
345
contiguous values from the iterator. The given iterator may produce values forever, only the
346
iterator interface is required (no indexing).
347

348
This scheduling option is generally a good choice if the workload of individual iterations
349
is not uniform/has a large spread.
350

351
!!! compat "Julia 1.11"
352
    The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
353

354
### `:static`
355

356
`:static` scheduler creates one task per thread and divides the iterations equally among
357
them, assigning each task specifically to each thread. In particular, the value of
358
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
359
Specifying `:static` is an error if used from inside another `@threads` loop or from a
360
thread other than 1.
361

362
!!! note
363
    `:static` scheduling exists for supporting transition of code written before Julia 1.3.
364
    In newly written library functions, `:static` scheduling is discouraged because the
365
    functions using this option cannot be called from arbitrary worker threads.
366

367
## Examples
368

369
To illustrate of the different scheduling strategies, consider the following function
370
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
371

372
```julia-repl
373
julia> function busywait(seconds)
374
            tstart = time_ns()
375
            while (time_ns() - tstart) / 1e9 < seconds
376
            end
377
        end
378

379
julia> @time begin
380
            Threads.@spawn busywait(5)
381
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
382
                busywait(1)
383
            end
384
        end
385
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
386

387
julia> @time begin
388
            Threads.@spawn busywait(5)
389
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
390
                busywait(1)
391
            end
392
        end
393
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
394
```
395

396
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
397
to run two of the 1-second iterations to complete the for loop.
398
"""
399
macro threads(args...)
1✔
400
    na = length(args)
1✔
401
    if na == 2
1✔
UNCOV
402
        sched, ex = args
×
UNCOV
403
        if sched isa QuoteNode
×
404
            sched = sched.value
×
UNCOV
405
        elseif sched isa Symbol
×
406
            # for now only allow quoted symbols
UNCOV
407
            sched = nothing
×
408
        end
UNCOV
409
        if sched !== :static && sched !== :dynamic && sched !== :greedy
×
UNCOV
410
            throw(ArgumentError("unsupported schedule argument in @threads"))
×
411
        end
412
    elseif na == 1
1✔
413
        sched = :default
1✔
414
        ex = args[1]
1✔
415
    else
UNCOV
416
        throw(ArgumentError("wrong number of arguments in @threads"))
×
417
    end
418
    if !(isa(ex, Expr) && ex.head === :for)
1✔
UNCOV
419
        throw(ArgumentError("@threads requires a `for` loop expression"))
×
420
    end
421
    if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
1✔
UNCOV
422
        throw(ArgumentError("nested outer loops are not currently supported by @threads"))
×
423
    end
424
    return _threadsfor(ex.args[1], ex.args[2], sched)
1✔
425
end
426

427
function _spawn_set_thrpool(t::Task, tp::Symbol)
2✔
428
    tpid = _sym_to_tpid(tp)
2,192✔
429
    if tpid == -1 || _nthreads_in_pool(tpid) == 0
3,105✔
UNCOV
430
        tpid = _sym_to_tpid(:default)
×
431
    end
432
    _result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
3,102✔
433
    @assert _result == 1
3,102✔
434
    nothing
2,185✔
435
end
436

437
"""
438
    Threads.@spawn [:default|:interactive] expr
439

440
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
441
thread in the specified threadpool (`:default` if unspecified). The task is
442
allocated to a thread once one becomes available. To wait for the task to
443
finish, call [`wait`](@ref) on the result of this macro, or call
444
[`fetch`](@ref) to wait and then obtain its return value.
445

446
Values can be interpolated into `@spawn` via `\$`, which copies the value
447
directly into the constructed underlying closure. This allows you to insert
448
the _value_ of a variable, isolating the asynchronous code from changes to
449
the variable's value in the current task.
450

451
!!! note
452
    The thread that the task runs on may change if the task yields, therefore `threadid()` should not
453
    be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
454
    [multi-threading](@ref man-multithreading) manual for further important caveats.
455
    See also the chapter on [threadpools](@ref man-threadpools).
456

457
!!! compat "Julia 1.3"
458
    This macro is available as of Julia 1.3.
459

460
!!! compat "Julia 1.4"
461
    Interpolating values via `\$` is available as of Julia 1.4.
462

463
!!! compat "Julia 1.9"
464
    A threadpool may be specified as of Julia 1.9.
465

466
# Examples
467
```julia-repl
468
julia> t() = println("Hello from ", Threads.threadid());
469

470
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
471
Hello from 1
472
Hello from 1
473
Hello from 3
474
Hello from 4
475
```
476
"""
477
macro spawn(args...)
12✔
478
    tp = QuoteNode(:default)
12✔
479
    na = length(args)
12✔
480
    if na == 2
12✔
UNCOV
481
        ttype, ex = args
×
UNCOV
482
        if ttype isa QuoteNode
×
UNCOV
483
            ttype = ttype.value
×
UNCOV
484
            if ttype !== :interactive && ttype !== :default
×
UNCOV
485
                throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
×
486
            end
UNCOV
487
            tp = QuoteNode(ttype)
×
488
        else
UNCOV
489
            tp = ttype
×
490
        end
491
    elseif na == 1
12✔
492
        ex = args[1]
12✔
493
    else
UNCOV
494
        throw(ArgumentError("wrong number of arguments in @spawn"))
×
495
    end
496

497
    letargs = Base._lift_one_interp!(ex)
12✔
498

499
    thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
12✔
500
    var = esc(Base.sync_varname)
12✔
501
    quote
12✔
502
        let $(letargs...)
503
            local task = Task($thunk)
3,097✔
504
            task.sticky = false
3,097✔
505
            _spawn_set_thrpool(task, $(esc(tp)))
3,097✔
506
            if $(Expr(:islocal, var))
2,183✔
507
                put!($var, task)
2,059✔
508
            end
509
            schedule(task)
3,097✔
510
            task
2,186✔
511
        end
512
    end
513
end
514

515
# This is a stub that can be overloaded for downstream structures like `Channel`
516
function foreach end
517

518
# Scheduling traits that can be employed for downstream overloads
519
abstract type AbstractSchedule end
520
struct StaticSchedule <: AbstractSchedule end
521
struct FairSchedule <: AbstractSchedule end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc