• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37593

pending completion
#37593

push

local

web-flow
fix O(n^2) `length` calls in compact-ir lowering step (#50756)

This can be a problem for very long function bodies.

73982 of 84556 relevant lines covered (87.49%)

20204653.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/stdlib/Distributed/src/macros.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
let nextidx = Threads.Atomic{Int}(0)
4
    global nextproc
5
    function nextproc()
16✔
6
        idx = Threads.atomic_add!(nextidx, 1)
16✔
7
        return workers()[(idx % nworkers()) + 1]
16✔
8
    end
9
end
10

11
spawnat(p, thunk) = remotecall(thunk, p)
100,251✔
12

13
spawn_somewhere(thunk) = spawnat(nextproc(),thunk)
12✔
14

15
"""
16
    @spawn expr
17

18
Create a closure around an expression and run it on an automatically-chosen process,
19
returning a [`Future`](@ref) to the result.
20
This macro is deprecated; `@spawnat :any expr` should be used instead.
21

22
# Examples
23
```julia-repl
24
julia> addprocs(3);
25

26
julia> f = @spawn myid()
27
Future(2, 1, 5, nothing)
28

29
julia> fetch(f)
30
2
31

32
julia> f = @spawn myid()
33
Future(3, 1, 7, nothing)
34

35
julia> fetch(f)
36
3
37
```
38

39
!!! compat "Julia 1.3"
40
    As of Julia 1.3 this macro is deprecated. Use `@spawnat :any` instead.
41
"""
42
macro spawn(expr)
43
    thunk = esc(:(()->($expr)))
×
44
    var = esc(Base.sync_varname)
45
    quote
46
        local ref = spawn_somewhere($thunk)
47
        if $(Expr(:islocal, var))
48
            put!($var, ref)
49
        end
50
        ref
51
    end
52
end
53

54
"""
55
    @spawnat p expr
56

57
Create a closure around an expression and run the closure
58
asynchronously on process `p`. Return a [`Future`](@ref) to the result.
59
If `p` is the quoted literal symbol `:any`, then the system will pick a
60
processor to use automatically.
61

62
# Examples
63
```julia-repl
64
julia> addprocs(3);
65

66
julia> f = @spawnat 2 myid()
67
Future(2, 1, 3, nothing)
68

69
julia> fetch(f)
70
2
71

72
julia> f = @spawnat :any myid()
73
Future(3, 1, 7, nothing)
74

75
julia> fetch(f)
76
3
77
```
78

79
!!! compat "Julia 1.3"
80
    The `:any` argument is available as of Julia 1.3.
81
"""
82
macro spawnat(p, expr)
57✔
83
    thunk = esc(:(()->($expr)))
100,321✔
84
    var = esc(Base.sync_varname)
57✔
85
    if p === QuoteNode(:any)
57✔
86
        spawncall = :(spawn_somewhere($thunk))
3✔
87
    else
88
        spawncall = :(spawnat($(esc(p)), $thunk))
54✔
89
    end
90
    quote
57✔
91
        local ref = $spawncall
241✔
92
        if $(Expr(:islocal, var))
241✔
93
            put!($var, ref)
16✔
94
        end
95
        ref
241✔
96
    end
97
end
98

99
"""
100
    @fetch expr
101

102
Equivalent to `fetch(@spawnat :any expr)`.
103
See [`fetch`](@ref) and [`@spawnat`](@ref).
104

105
# Examples
106
```julia-repl
107
julia> addprocs(3);
108

109
julia> @fetch myid()
110
2
111

112
julia> @fetch myid()
113
3
114

115
julia> @fetch myid()
116
4
117

118
julia> @fetch myid()
119
2
120
```
121
"""
122
macro fetch(expr)
1✔
123
    thunk = esc(:(()->($expr)))
5✔
124
    :(remotecall_fetch($thunk, nextproc()))
1✔
125
end
126

127
"""
128
    @fetchfrom
129

130
Equivalent to `fetch(@spawnat p expr)`.
131
See [`fetch`](@ref) and [`@spawnat`](@ref).
132

133
# Examples
134
```julia-repl
135
julia> addprocs(3);
136

137
julia> @fetchfrom 2 myid()
138
2
139

140
julia> @fetchfrom 4 myid()
141
4
142
```
143
"""
144
macro fetchfrom(p, expr)
9✔
145
    thunk = esc(:(()->($expr)))
22✔
146
    :(remotecall_fetch($thunk, $(esc(p))))
9✔
147
end
148

149
# extract a list of modules to import from an expression
150
extract_imports!(imports, x) = imports
45✔
151
function extract_imports!(imports, ex::Expr)
103✔
152
    if Meta.isexpr(ex, (:import, :using))
103✔
153
        push!(imports, ex)
13✔
154
    elseif Meta.isexpr(ex, :let)
90✔
155
        extract_imports!(imports, ex.args[2])
1✔
156
    elseif Meta.isexpr(ex, (:toplevel, :block))
89✔
157
        for arg in ex.args
16✔
158
            extract_imports!(imports, arg)
132✔
159
        end
88✔
160
    end
161
    return imports
103✔
162
end
163
extract_imports(x) = extract_imports!(Any[], x)
59✔
164

165
"""
166
    @everywhere [procs()] expr
167

168
Execute an expression under `Main` on all `procs`.
169
Errors on any of the processes are collected into a
170
[`CompositeException`](@ref) and thrown. For example:
171

172
    @everywhere bar = 1
173

174
will define `Main.bar` on all current processes. Any processes added later
175
(say with [`addprocs()`](@ref)) will not have the expression defined.
176

177
Unlike [`@spawnat`](@ref), `@everywhere` does not capture any local variables.
178
Instead, local variables can be broadcast using interpolation:
179

180
    foo = 1
181
    @everywhere bar = \$foo
182

183
The optional argument `procs` allows specifying a subset of all
184
processes to have execute the expression.
185

186
Similar to calling `remotecall_eval(Main, procs, expr)`, but with two extra features:
187

188
    - `using` and `import` statements run on the calling process first, to ensure
189
      packages are precompiled.
190
    - The current source file path used by `include` is propagated to other processes.
191
"""
192
macro everywhere(ex)
47✔
193
    procs = GlobalRef(@__MODULE__, :procs)
47✔
194
    return esc(:($(Distributed).@everywhere $procs() $ex))
47✔
195
end
196

197
macro everywhere(procs, ex)
58✔
198
    imps = extract_imports(ex)
58✔
199
    return quote
58✔
200
        $(isempty(imps) ? nothing : Expr(:toplevel, imps...)) # run imports locally first
×
201
        let ex = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), $(esc(Expr(:quote, ex)))),
22✔
202
            procs = $(esc(procs))
203
            remotecall_eval(Main, procs, ex)
12✔
204
        end
205
    end
206
end
207

208
"""
209
    remotecall_eval(m::Module, procs, expression)
210

211
Execute an expression under module `m` on the processes
212
specified in `procs`.
213
Errors on any of the processes are collected into a
214
[`CompositeException`](@ref) and thrown.
215

216
See also [`@everywhere`](@ref).
217
"""
218
function remotecall_eval(m::Module, procs, ex)
56✔
219
    @sync begin
58✔
220
        run_locally = 0
56✔
221
        for pid in procs
56✔
222
            if pid == myid()
221✔
223
                run_locally += 1
54✔
224
            else
225
                @async_unwrap remotecall_wait(Core.eval, pid, m, ex)
334✔
226
            end
227
        end
277✔
228
        yield() # ensure that the remotecalls have had a chance to start
56✔
229

230
        # execute locally last as we do not want local execution to block serialization
231
        # of the request to remote nodes.
232
        for _ in 1:run_locally
108✔
233
            @async Core.eval(m, ex)
108✔
234
        end
56✔
235
    end
236
    nothing
54✔
237
end
238

239
# optimized version of remotecall_eval for a single pid
240
# and which also fetches the return value
241
function remotecall_eval(m::Module, pid::Int, ex)
8✔
242
    return remotecall_fetch(Core.eval, pid, m, ex)
10✔
243
end
244

245

246
# Statically split range [firstIndex,lastIndex] into equal sized chunks for np processors
247
function splitrange(firstIndex::Int, lastIndex::Int, np::Int)
32✔
248
    each, extras = divrem(lastIndex-firstIndex+1, np)
32✔
249
    nchunks = each > 0 ? np : extras
32✔
250
    chunks = Vector{UnitRange{Int}}(undef, nchunks)
32✔
251
    lo = firstIndex
32✔
252
    for i in 1:nchunks
64✔
253
        hi = lo + each - 1
93✔
254
        if extras > 0
93✔
255
            hi += 1
51✔
256
            extras -= 1
51✔
257
        end
258
        chunks[i] = lo:hi
93✔
259
        lo = hi+1
93✔
260
    end
154✔
261
    return chunks
32✔
262
end
263

264
function preduce(reducer, f, R)
14✔
265
    chunks = splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
27✔
266
    all_w = workers()[1:length(chunks)]
14✔
267

268
    w_exec = Task[]
14✔
269
    for (idx,pid) in enumerate(all_w)
28✔
270
        t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx])))
94✔
271
        schedule(t)
47✔
272
        push!(w_exec, t)
47✔
273
    end
80✔
274
    reduce(reducer, Any[fetch(t) for t in w_exec])
14✔
275
end
276

277
function pfor(f, R)
3✔
278
    t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
6✔
279
        @spawnat :any f(R, first(c), last(c))
×
280
    end
×
281
    errormonitor(t)
3✔
282
end
283

284
function make_preduce_body(var, body)
8✔
285
    quote
8✔
286
        function (reducer, R, lo::Int, hi::Int)
47✔
287
            $(esc(var)) = R[lo]
47✔
288
            ac = $(esc(body))
47✔
289
            if lo != hi
47✔
290
                for $(esc(var)) in R[(lo+1):hi]
80✔
291
                    ac = reducer(ac, $(esc(body)))
159✔
292
                end
272✔
293
            end
294
            ac
47✔
295
        end
296
    end
297
end
298

299
function make_pfor_body(var, body)
3✔
300
    quote
3✔
301
        function (R, lo::Int, hi::Int)
10✔
302
            for $(esc(var)) in R[lo:hi]
20✔
303
                $(esc(body))
22✔
304
            end
22✔
305
        end
306
    end
307
end
308

309
"""
310
    @distributed
311

312
A distributed memory, parallel for loop of the form :
313

314
    @distributed [reducer] for var = range
315
        body
316
    end
317

318
The specified range is partitioned and locally executed across all workers. In case an
319
optional reducer function is specified, `@distributed` performs local reductions on each worker
320
with a final reduction on the calling process.
321

322
Note that without a reducer function, `@distributed` executes asynchronously, i.e. it spawns
323
independent tasks on all available workers and returns immediately without waiting for
324
completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
325

326
    @sync @distributed for var = range
327
        body
328
    end
329
"""
330
macro distributed(args...)
11✔
331
    na = length(args)
11✔
332
    if na==1
11✔
333
        loop = args[1]
3✔
334
    elseif na==2
8✔
335
        reducer = args[1]
8✔
336
        loop = args[2]
8✔
337
    else
338
        throw(ArgumentError("wrong number of arguments to @distributed"))
×
339
    end
340
    if !isa(loop,Expr) || loop.head !== :for
22✔
341
        error("malformed @distributed loop")
×
342
    end
343
    var = loop.args[1].args[1]
11✔
344
    r = loop.args[1].args[2]
11✔
345
    body = loop.args[2]
11✔
346
    if Meta.isexpr(body, :block) && body.args[end] isa LineNumberNode
11✔
347
        resize!(body.args, length(body.args) - 1)
11✔
348
    end
349
    if na==1
11✔
350
        syncvar = esc(Base.sync_varname)
3✔
351
        return quote
3✔
352
            local ref = pfor($(make_pfor_body(var, body)), $(esc(r)))
353
            if $(Expr(:islocal, syncvar))
354
                put!($syncvar, ref)
355
            end
356
            ref
357
        end
358
    else
359
        return :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
8✔
360
    end
361
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc