• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37433

pending completion
#37433

push

local

web-flow
Merge pull request #48513 from JuliaLang/jn/extend-once

ensure extension triggers are only run by the package that satified them

60 of 60 new or added lines in 1 file covered. (100.0%)

72324 of 82360 relevant lines covered (87.81%)

31376331.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.81
/stdlib/Distributed/src/workerpool.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
    AbstractWorkerPool
5

6
Supertype for worker pools such as [`WorkerPool`](@ref) and [`CachingPool`](@ref).
7
An `AbstractWorkerPool` should implement:
8
  - [`push!`](@ref) - add a new worker to the overall pool (available + busy)
9
  - [`put!`](@ref) - put back a worker to the available pool
10
  - [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
11
  - [`length`](@ref) - number of workers available in the overall pool
12
  - [`isready`](@ref) - return false if a `take!` on the pool would block, else true
13

14
The default implementations of the above (on a `AbstractWorkerPool`) require fields
15
  - `channel::Channel{Int}`
16
  - `workers::Set{Int}`
17
where `channel` contains free worker pids and `workers` is the set of all workers associated with this pool.
18
"""
19
abstract type AbstractWorkerPool end
20

21
mutable struct WorkerPool <: AbstractWorkerPool
22
    channel::Channel{Int}
23
    workers::Set{Int}
24
    ref::RemoteChannel
25

26
    WorkerPool(c::Channel, ref::RemoteChannel) = new(c, Set{Int}(), ref)
18✔
27
end
28

29
function WorkerPool()
16✔
30
    wp = WorkerPool(Channel{Int}(typemax(Int)), RemoteChannel())
16✔
31
    put!(wp.ref, WeakRef(wp))
16✔
32
    wp
16✔
33
end
34

35
"""
36
    WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
37

38
Create a `WorkerPool` from a vector or range of worker ids.
39

40
# Examples
41
```julia-repl
42
\$ julia -p 3
43

44
julia> WorkerPool([2, 3])
45
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
46

47
julia> WorkerPool(2:4)
48
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
49
```
50
"""
51
function WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
4✔
52
    pool = WorkerPool()
4✔
53
    foreach(w->push!(pool, w), workers)
17✔
54
    return pool
4✔
55
end
56

57
# On workers where this pool has been serialized to, instantiate with a dummy local channel.
58
WorkerPool(ref::RemoteChannel) = WorkerPool(Channel{Int}(1), ref)
2✔
59

60
function serialize(S::AbstractSerializer, pool::WorkerPool)
2✔
61
    # Allow accessing a worker pool from other processors. When serialized,
62
    # initialize the `ref` to point to self and only send the ref.
63
    # Other workers will forward all put!, take!, calls to the process owning
64
    # the ref (and hence the pool).
65
    Serialization.serialize_type(S, typeof(pool))
2✔
66
    serialize(S, pool.ref)
2✔
67
end
68

69
deserialize(S::AbstractSerializer, t::Type{T}) where {T<:WorkerPool} = T(deserialize(S))
2✔
70

71
wp_local_push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool)
345✔
72
wp_local_length(pool::AbstractWorkerPool) = length(pool.workers)
7,731✔
73
wp_local_isready(pool::AbstractWorkerPool) = isready(pool.channel)
11✔
74

75
function wp_local_put!(pool::AbstractWorkerPool, w::Int)
2,463✔
76
    # In case of default_worker_pool, the master is implicitly considered a worker, i.e.,
77
    # it is not present in pool.workers.
78
    # Confirm the that the worker is part of a pool before making it available.
79
    w in pool.workers && put!(pool.channel, w)
2,463✔
80
    w
2,463✔
81
end
82

83
function wp_local_workers(pool::AbstractWorkerPool)
×
84
    if length(pool) == 0 && pool === default_worker_pool()
×
85
        return [1]
×
86
    else
87
        return collect(pool.workers)
×
88
    end
89
end
90

91
function wp_local_nworkers(pool::AbstractWorkerPool)
5,016✔
92
    if length(pool) == 0 && pool === default_worker_pool()
5,016✔
93
        return 1
101✔
94
    else
95
        return length(pool.workers)
4,915✔
96
    end
97
end
98

99
function wp_local_take!(pool::AbstractWorkerPool)
2,463✔
100
    # Find an active worker
101
    worker = 0
2,463✔
102
    while true
2,514✔
103
        if length(pool) == 0
2,514✔
104
            if pool === default_worker_pool()
1✔
105
                # No workers, the master process is used as a worker
106
                worker = 1
1✔
107
                break
1✔
108
            else
109
                throw(ErrorException("No active worker available in pool"))
×
110
            end
111
        end
112

113
        worker = take!(pool.channel)
2,513✔
114
        if id_in_procs(worker)
2,513✔
115
            break
2,462✔
116
        else
117
            delete!(pool.workers, worker) # Remove invalid worker from pool
51✔
118
        end
119
    end
51✔
120
    return worker
2,463✔
121
end
122

123
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
4,726✔
124
    worker = take!(pool)
2,363✔
125
    try
2,363✔
126
        rc_f(f, worker, args...; kwargs...)
2,392✔
127
    finally
128
        put!(pool, worker)
2,363✔
129
    end
130
end
131

132
# Check if pool is local or remote and forward calls if required.
133
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
134
# it avoids the overhead associated with a local remotecall.
135

136
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
137
    func_local = Symbol(string("wp_local_", func))
138
    @eval begin
139
        function ($func)(pool::WorkerPool)
14,817✔
140
            if pool.ref.where != myid()
14,817✔
141
                return remotecall_fetch(ref->($func_local)(fetch(ref).value), pool.ref.where, pool.ref)::$rt
812✔
142
            else
143
                return ($func_local)(pool)
14,411✔
144
            end
145
        end
146

147
        # default impl
148
        ($func)(pool::AbstractWorkerPool) = ($func_local)(pool)
404✔
149
    end
150
end
151

152
for func = (:push!, :put!)
153
    func_local = Symbol(string("wp_local_", func))
154
    @eval begin
155
        function ($func)(pool::WorkerPool, w::Int)
2,474✔
156
            if pool.ref.where != myid()
2,474✔
157
                return remotecall_fetch((ref, w)->($func_local)(fetch(ref).value, w), pool.ref.where, pool.ref, w)
400✔
158
            else
159
                return ($func_local)(pool, w)
2,274✔
160
            end
161
        end
162

163
        # default impl
164
        ($func)(pool::AbstractWorkerPool, w::Int) = ($func_local)(pool, w)
104✔
165
    end
166
end
167

168

169
"""
170
    remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
171

172
[`WorkerPool`](@ref) variant of `remotecall(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remotecall` on it.
173

174
# Examples
175
```julia-repl
176
\$ julia -p 3
177

178
julia> wp = WorkerPool([2, 3]);
179

180
julia> A = rand(3000);
181

182
julia> f = remotecall(maximum, wp, A)
183
Future(2, 1, 6, nothing)
184
```
185
In this example, the task ran on pid 2, called from pid 1.
186
"""
187
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall, f, pool, args...; kwargs...)
10✔
188

189

190
"""
191
    remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
192

193
[`WorkerPool`](@ref) variant of `remotecall_wait(f, pid, ....)`. Wait for and take a free worker from `pool` and
194
perform a `remotecall_wait` on it.
195

196
# Examples
197
```julia-repl
198
\$ julia -p 3
199

200
julia> wp = WorkerPool([2, 3]);
201

202
julia> A = rand(3000);
203

204
julia> f = remotecall_wait(maximum, wp, A)
205
Future(3, 1, 9, nothing)
206

207
julia> fetch(f)
208
0.9995177101692958
209
```
210
"""
211
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_wait, f, pool, args...; kwargs...)
10✔
212

213

214
"""
215
    remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
216

217
[`WorkerPool`](@ref) variant of `remotecall_fetch(f, pid, ....)`. Waits for and takes a free worker from `pool` and
218
performs a `remotecall_fetch` on it.
219

220
# Examples
221
```julia-repl
222
\$ julia -p 3
223

224
julia> wp = WorkerPool([2, 3]);
225

226
julia> A = rand(3000);
227

228
julia> remotecall_fetch(maximum, wp, A)
229
0.9995177101692958
230
```
231
"""
232
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_fetch, f, pool, args...; kwargs...)
4,896✔
233

234
"""
235
    remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
236

237
[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
238
perform a `remote_do` on it.
239
"""
240
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)
10✔
241

242
const _default_worker_pool = Ref{Union{WorkerPool, Nothing}}(nothing)
243

244
"""
245
    default_worker_pool()
246

247
[`WorkerPool`](@ref) containing idle [`workers`](@ref) - used by `remote(f)` and [`pmap`](@ref) (by default).
248

249
# Examples
250
```julia-repl
251
\$ julia -p 3
252

253
julia> default_worker_pool()
254
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
255
```
256
"""
257
function default_worker_pool()
263✔
258
    # On workers retrieve the default worker pool from the master when accessed
259
    # for the first time
260
    if _default_worker_pool[] === nothing
263✔
261
        if myid() == 1
13✔
262
            _default_worker_pool[] = WorkerPool()
12✔
263
        else
264
            _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1)
2✔
265
        end
266
    end
267
    return _default_worker_pool[]
263✔
268
end
269

270
"""
271
    remote([p::AbstractWorkerPool], f) -> Function
272

273
Return an anonymous function that executes function `f` on an available worker
274
(drawn from [`WorkerPool`](@ref) `p` if provided) using [`remotecall_fetch`](@ref).
275
"""
276
remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...)
27✔
277
remote(p::AbstractWorkerPool, f) = (args...; kwargs...)->remotecall_fetch(f, p, args...; kwargs...)
4,091✔
278

279
mutable struct CachingPool <: AbstractWorkerPool
280
    channel::Channel{Int}
281
    workers::Set{Int}
282

283
    # Mapping between a tuple (worker_id, f) and a RemoteChannel
284
    map_obj2ref::IdDict{Tuple{Int, Function}, RemoteChannel}
285

286
    function CachingPool()
1✔
287
        wp = new(Channel{Int}(typemax(Int)), Set{Int}(), IdDict{Tuple{Int, Function}, RemoteChannel}())
1✔
288
        finalizer(clear!, wp)
1✔
289
        wp
1✔
290
    end
291
end
292

293
serialize(s::AbstractSerializer, cp::CachingPool) = throw(ErrorException("CachingPool objects are not serializable."))
×
294

295
"""
296
    CachingPool(workers::Vector{Int})
297

298
An implementation of an `AbstractWorkerPool`.
299
[`remote`](@ref), [`remotecall_fetch`](@ref),
300
[`pmap`](@ref) (and other remote calls which execute functions remotely)
301
benefit from caching the serialized/deserialized functions on the worker nodes,
302
especially closures (which may capture large amounts of data).
303

304
The remote cache is maintained for the lifetime of the returned `CachingPool` object.
305
To clear the cache earlier, use `clear!(pool)`.
306

307
For global variables, only the bindings are captured in a closure, not the data.
308
`let` blocks can be used to capture global data.
309

310
# Examples
311
```julia
312
const foo = rand(10^8);
313
wp = CachingPool(workers())
314
let foo = foo
315
    pmap(i -> sum(foo) + i, wp, 1:100);
316
end
317
```
318

319
The above would transfer `foo` only once to each worker.
320

321
"""
322
function CachingPool(workers::Vector{Int})
1✔
323
    pool = CachingPool()
1✔
324
    for w in workers
1✔
325
        push!(pool, w)
4✔
326
    end
5✔
327
    return pool
1✔
328
end
329

330
"""
331
    clear!(pool::CachingPool) -> pool
332

333
Removes all cached functions from all participating workers.
334
"""
335
function clear!(pool::CachingPool)
1✔
336
    for (_,rr) in pool.map_obj2ref
2✔
337
        finalize(rr)
4✔
338
    end
7✔
339
    empty!(pool.map_obj2ref)
1✔
340
    pool
1✔
341
end
342

343
exec_from_cache(rr::RemoteChannel, args...; kwargs...) = fetch(rr)(args...; kwargs...)
192✔
344
function exec_from_cache(f_ref::Tuple{Function, RemoteChannel}, args...; kwargs...)
8✔
345
    put!(f_ref[2], f_ref[1])        # Cache locally
4✔
346
    f_ref[1](args...; kwargs...)
4✔
347
end
348

349
function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
200✔
350
    worker = take!(pool)
100✔
351
    f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
196✔
352
    isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2])   # Add to tracker
100✔
353

354
    try
100✔
355
        rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
100✔
356
    finally
357
        put!(pool, worker)
100✔
358
    end
359
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc