• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37592

pending completion
#37592

push

local

web-flow
Print out module in more places when we abort (#50723)

70869 of 83602 relevant lines covered (84.77%)

32000283.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.0
/stdlib/Distributed/src/macros.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
let nextidx = Threads.Atomic{Int}(0)
4
    global nextproc
5
    function nextproc()
×
6
        idx = Threads.atomic_add!(nextidx, 1)
×
7
        return workers()[(idx % nworkers()) + 1]
×
8
    end
9
end
10

11
spawnat(p, thunk) = remotecall(thunk, p)
214✔
12

13
spawn_somewhere(thunk) = spawnat(nextproc(),thunk)
×
14

15
"""
16
    @spawn expr
17

18
Create a closure around an expression and run it on an automatically-chosen process,
19
returning a [`Future`](@ref) to the result.
20
This macro is deprecated; `@spawnat :any expr` should be used instead.
21

22
# Examples
23
```julia-repl
24
julia> addprocs(3);
25

26
julia> f = @spawn myid()
27
Future(2, 1, 5, nothing)
28

29
julia> fetch(f)
30
2
31

32
julia> f = @spawn myid()
33
Future(3, 1, 7, nothing)
34

35
julia> fetch(f)
36
3
37
```
38

39
!!! compat "Julia 1.3"
40
    As of Julia 1.3 this macro is deprecated. Use `@spawnat :any` instead.
41
"""
42
macro spawn(expr)
43
    thunk = esc(:(()->($expr)))
×
44
    var = esc(Base.sync_varname)
45
    quote
46
        local ref = spawn_somewhere($thunk)
47
        if $(Expr(:islocal, var))
48
            put!($var, ref)
49
        end
50
        ref
51
    end
52
end
53

54
"""
55
    @spawnat p expr
56

57
Create a closure around an expression and run the closure
58
asynchronously on process `p`. Return a [`Future`](@ref) to the result.
59
If `p` is the quoted literal symbol `:any`, then the system will pick a
60
processor to use automatically.
61

62
# Examples
63
```julia-repl
64
julia> addprocs(3);
65

66
julia> f = @spawnat 2 myid()
67
Future(2, 1, 3, nothing)
68

69
julia> fetch(f)
70
2
71

72
julia> f = @spawnat :any myid()
73
Future(3, 1, 7, nothing)
74

75
julia> fetch(f)
76
3
77
```
78

79
!!! compat "Julia 1.3"
80
    The `:any` argument is available as of Julia 1.3.
81
"""
82
macro spawnat(p, expr)
41✔
83
    thunk = esc(:(()->($expr)))
265✔
84
    var = esc(Base.sync_varname)
41✔
85
    if p === QuoteNode(:any)
41✔
86
        spawncall = :(spawn_somewhere($thunk))
×
87
    else
88
        spawncall = :(spawnat($(esc(p)), $thunk))
41✔
89
    end
90
    quote
41✔
91
        local ref = $spawncall
214✔
92
        if $(Expr(:islocal, var))
214✔
93
            put!($var, ref)
×
94
        end
95
        ref
214✔
96
    end
97
end
98

99
"""
100
    @fetch expr
101

102
Equivalent to `fetch(@spawnat :any expr)`.
103
See [`fetch`](@ref) and [`@spawnat`](@ref).
104

105
# Examples
106
```julia-repl
107
julia> addprocs(3);
108

109
julia> @fetch myid()
110
2
111

112
julia> @fetch myid()
113
3
114

115
julia> @fetch myid()
116
4
117

118
julia> @fetch myid()
119
2
120
```
121
"""
122
macro fetch(expr)
123
    thunk = esc(:(()->($expr)))
×
124
    :(remotecall_fetch($thunk, nextproc()))
125
end
126

127
"""
128
    @fetchfrom
129

130
Equivalent to `fetch(@spawnat p expr)`.
131
See [`fetch`](@ref) and [`@spawnat`](@ref).
132

133
# Examples
134
```julia-repl
135
julia> addprocs(3);
136

137
julia> @fetchfrom 2 myid()
138
2
139

140
julia> @fetchfrom 4 myid()
141
4
142
```
143
"""
144
macro fetchfrom(p, expr)
145
    thunk = esc(:(()->($expr)))
×
146
    :(remotecall_fetch($thunk, $(esc(p))))
147
end
148

149
# extract a list of modules to import from an expression
150
extract_imports!(imports, x) = imports
×
151
function extract_imports!(imports, ex::Expr)
2✔
152
    if Meta.isexpr(ex, (:import, :using))
2✔
153
        push!(imports, ex)
×
154
    elseif Meta.isexpr(ex, :let)
2✔
155
        extract_imports!(imports, ex.args[2])
×
156
    elseif Meta.isexpr(ex, (:toplevel, :block))
2✔
157
        for arg in ex.args
×
158
            extract_imports!(imports, arg)
×
159
        end
×
160
    end
161
    return imports
2✔
162
end
163
extract_imports(x) = extract_imports!(Any[], x)
2✔
164

165
"""
166
    @everywhere [procs()] expr
167

168
Execute an expression under `Main` on all `procs`.
169
Errors on any of the processes are collected into a
170
[`CompositeException`](@ref) and thrown. For example:
171

172
    @everywhere bar = 1
173

174
will define `Main.bar` on all current processes. Any processes added later
175
(say with [`addprocs()`](@ref)) will not have the expression defined.
176

177
Unlike [`@spawnat`](@ref), `@everywhere` does not capture any local variables.
178
Instead, local variables can be broadcast using interpolation:
179

180
    foo = 1
181
    @everywhere bar = \$foo
182

183
The optional argument `procs` allows specifying a subset of all
184
processes to have execute the expression.
185

186
Similar to calling `remotecall_eval(Main, procs, expr)`, but with two extra features:
187

188
    - `using` and `import` statements run on the calling process first, to ensure
189
      packages are precompiled.
190
    - The current source file path used by `include` is propagated to other processes.
191
"""
192
macro everywhere(ex)
2✔
193
    procs = GlobalRef(@__MODULE__, :procs)
2✔
194
    return esc(:($(Distributed).@everywhere $procs() $ex))
2✔
195
end
196

197
macro everywhere(procs, ex)
2✔
198
    imps = extract_imports(ex)
2✔
199
    return quote
2✔
200
        $(isempty(imps) ? nothing : Expr(:toplevel, imps...)) # run imports locally first
×
201
        let ex = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), $(esc(Expr(:quote, ex)))),
4✔
202
            procs = $(esc(procs))
203
            remotecall_eval(Main, procs, ex)
2✔
204
        end
205
    end
206
end
207

208
"""
209
    remotecall_eval(m::Module, procs, expression)
210

211
Execute an expression under module `m` on the processes
212
specified in `procs`.
213
Errors on any of the processes are collected into a
214
[`CompositeException`](@ref) and thrown.
215

216
See also [`@everywhere`](@ref).
217
"""
218
function remotecall_eval(m::Module, procs, ex)
2✔
219
    @sync begin
2✔
220
        run_locally = 0
2✔
221
        for pid in procs
2✔
222
            if pid == myid()
18✔
223
                run_locally += 1
2✔
224
            else
225
                @async_unwrap remotecall_wait(Core.eval, pid, m, ex)
32✔
226
            end
227
        end
20✔
228
        yield() # ensure that the remotecalls have had a chance to start
2✔
229

230
        # execute locally last as we do not want local execution to block serialization
231
        # of the request to remote nodes.
232
        for _ in 1:run_locally
4✔
233
            @async Core.eval(m, ex)
4✔
234
        end
2✔
235
    end
236
    nothing
2✔
237
end
238

239
# optimized version of remotecall_eval for a single pid
240
# and which also fetches the return value
241
function remotecall_eval(m::Module, pid::Int, ex)
×
242
    return remotecall_fetch(Core.eval, pid, m, ex)
×
243
end
244

245

246
# Statically split range [firstIndex,lastIndex] into equal sized chunks for np processors
247
function splitrange(firstIndex::Int, lastIndex::Int, np::Int)
×
248
    each, extras = divrem(lastIndex-firstIndex+1, np)
×
249
    nchunks = each > 0 ? np : extras
×
250
    chunks = Vector{UnitRange{Int}}(undef, nchunks)
×
251
    lo = firstIndex
×
252
    for i in 1:nchunks
×
253
        hi = lo + each - 1
×
254
        if extras > 0
×
255
            hi += 1
×
256
            extras -= 1
×
257
        end
258
        chunks[i] = lo:hi
×
259
        lo = hi+1
×
260
    end
×
261
    return chunks
×
262
end
263

264
function preduce(reducer, f, R)
×
265
    chunks = splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
×
266
    all_w = workers()[1:length(chunks)]
×
267

268
    w_exec = Task[]
×
269
    for (idx,pid) in enumerate(all_w)
×
270
        t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx])))
×
271
        schedule(t)
×
272
        push!(w_exec, t)
×
273
    end
×
274
    reduce(reducer, Any[fetch(t) for t in w_exec])
×
275
end
276

277
function pfor(f, R)
×
278
    t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
×
279
        @spawnat :any f(R, first(c), last(c))
×
280
    end
×
281
    errormonitor(t)
×
282
end
283

284
function make_preduce_body(var, body)
×
285
    quote
×
286
        function (reducer, R, lo::Int, hi::Int)
×
287
            $(esc(var)) = R[lo]
×
288
            ac = $(esc(body))
×
289
            if lo != hi
×
290
                for $(esc(var)) in R[(lo+1):hi]
×
291
                    ac = reducer(ac, $(esc(body)))
×
292
                end
×
293
            end
294
            ac
×
295
        end
296
    end
297
end
298

299
function make_pfor_body(var, body)
×
300
    quote
×
301
        function (R, lo::Int, hi::Int)
×
302
            for $(esc(var)) in R[lo:hi]
×
303
                $(esc(body))
×
304
            end
×
305
        end
306
    end
307
end
308

309
"""
310
    @distributed
311

312
A distributed memory, parallel for loop of the form :
313

314
    @distributed [reducer] for var = range
315
        body
316
    end
317

318
The specified range is partitioned and locally executed across all workers. In case an
319
optional reducer function is specified, `@distributed` performs local reductions on each worker
320
with a final reduction on the calling process.
321

322
Note that without a reducer function, `@distributed` executes asynchronously, i.e. it spawns
323
independent tasks on all available workers and returns immediately without waiting for
324
completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
325

326
    @sync @distributed for var = range
327
        body
328
    end
329
"""
330
macro distributed(args...)
331
    na = length(args)
332
    if na==1
333
        loop = args[1]
334
    elseif na==2
335
        reducer = args[1]
336
        loop = args[2]
337
    else
338
        throw(ArgumentError("wrong number of arguments to @distributed"))
339
    end
340
    if !isa(loop,Expr) || loop.head !== :for
341
        error("malformed @distributed loop")
342
    end
343
    var = loop.args[1].args[1]
344
    r = loop.args[1].args[2]
345
    body = loop.args[2]
346
    if Meta.isexpr(body, :block) && body.args[end] isa LineNumberNode
347
        resize!(body.args, length(body.args) - 1)
348
    end
349
    if na==1
350
        syncvar = esc(Base.sync_varname)
351
        return quote
352
            local ref = pfor($(make_pfor_body(var, body)), $(esc(r)))
353
            if $(Expr(:islocal, syncvar))
354
                put!($syncvar, ref)
355
            end
356
            ref
357
        end
358
    else
359
        return :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
360
    end
361
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc