• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37584

pending completion
#37584

push

local

web-flow
relax assertion involving pg->nold to reflect that it may be a bit inaccurate with parallel marking (#50466)

70958 of 83746 relevant lines covered (84.73%)

23916169.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.14
/stdlib/Distributed/src/workerpool.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
    AbstractWorkerPool
5

6
Supertype for worker pools such as [`WorkerPool`](@ref) and [`CachingPool`](@ref).
7
An `AbstractWorkerPool` should implement:
8
  - [`push!`](@ref) - add a new worker to the overall pool (available + busy)
9
  - [`put!`](@ref) - put back a worker to the available pool
10
  - [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
11
  - [`length`](@ref) - number of workers available in the overall pool
12
  - [`isready`](@ref) - return false if a `take!` on the pool would block, else true
13

14
The default implementations of the above (on a `AbstractWorkerPool`) require fields
15
  - `channel::Channel{Int}`
16
  - `workers::Set{Int}`
17
where `channel` contains free worker pids and `workers` is the set of all workers associated with this pool.
18
"""
19
abstract type AbstractWorkerPool end
20

21
mutable struct WorkerPool <: AbstractWorkerPool
22
    channel::Channel{Int}
23
    workers::Set{Int}
24
    ref::RemoteChannel
25

26
    WorkerPool(c::Channel, ref::RemoteChannel) = new(c, Set{Int}(), ref)
3✔
27
end
28

29
function WorkerPool()
3✔
30
    wp = WorkerPool(Channel{Int}(typemax(Int)), RemoteChannel())
3✔
31
    put!(wp.ref, WeakRef(wp))
3✔
32
    wp
3✔
33
end
34

35
"""
36
    WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
37

38
Create a `WorkerPool` from a vector or range of worker ids.
39

40
# Examples
41
```julia-repl
42
\$ julia -p 3
43

44
julia> WorkerPool([2, 3])
45
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
46

47
julia> WorkerPool(2:4)
48
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
49
```
50
"""
51
function WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
×
52
    pool = WorkerPool()
×
53
    foreach(w->push!(pool, w), workers)
×
54
    return pool
×
55
end
56

57
# On workers where this pool has been serialized to, instantiate with a dummy local channel.
58
WorkerPool(ref::RemoteChannel) = WorkerPool(Channel{Int}(1), ref)
×
59

60
function serialize(S::AbstractSerializer, pool::WorkerPool)
×
61
    # Allow accessing a worker pool from other processors. When serialized,
62
    # initialize the `ref` to point to self and only send the ref.
63
    # Other workers will forward all put!, take!, calls to the process owning
64
    # the ref (and hence the pool).
65
    Serialization.serialize_type(S, typeof(pool))
×
66
    serialize(S, pool.ref)
×
67
end
68

69
deserialize(S::AbstractSerializer, t::Type{T}) where {T<:WorkerPool} = T(deserialize(S))
×
70

71
wp_local_push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool)
129✔
72
wp_local_length(pool::AbstractWorkerPool) = length(pool.workers)
×
73
wp_local_isready(pool::AbstractWorkerPool) = isready(pool.channel)
×
74

75
function wp_local_put!(pool::AbstractWorkerPool, w::Int)
×
76
    # In case of default_worker_pool, the master is implicitly considered a worker, i.e.,
77
    # it is not present in pool.workers.
78
    # Confirm the that the worker is part of a pool before making it available.
79
    w in pool.workers && put!(pool.channel, w)
×
80
    w
×
81
end
82

83
function wp_local_workers(pool::AbstractWorkerPool)
×
84
    if length(pool) == 0 && pool === default_worker_pool()
×
85
        return [1]
×
86
    else
87
        return collect(pool.workers)
×
88
    end
89
end
90

91
function wp_local_nworkers(pool::AbstractWorkerPool)
×
92
    if length(pool) == 0 && pool === default_worker_pool()
×
93
        return 1
×
94
    else
95
        return length(pool.workers)
×
96
    end
97
end
98

99
function wp_local_take!(pool::AbstractWorkerPool)
×
100
    # Find an active worker
101
    worker = 0
×
102
    while true
×
103
        if length(pool) == 0
×
104
            if pool === default_worker_pool()
×
105
                # No workers, the master process is used as a worker
106
                worker = 1
×
107
                break
×
108
            else
109
                throw(ErrorException("No active worker available in pool"))
×
110
            end
111
        end
112

113
        worker = take!(pool.channel)
×
114
        if id_in_procs(worker)
×
115
            break
×
116
        else
117
            delete!(pool.workers, worker) # Remove invalid worker from pool
×
118
        end
119
    end
×
120
    return worker
×
121
end
122

123
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
×
124
    worker = take!(pool)
×
125
    try
×
126
        rc_f(f, worker, args...; kwargs...)
×
127
    finally
128
        put!(pool, worker)
×
129
    end
130
end
131

132
# Check if pool is local or remote and forward calls if required.
133
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
134
# it avoids the overhead associated with a local remotecall.
135

136
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
137
    func_local = Symbol(string("wp_local_", func))
138
    @eval begin
139
        function ($func)(pool::WorkerPool)
×
140
            if pool.ref.where != myid()
×
141
                return remotecall_fetch(ref->($func_local)(fetch(ref).value), pool.ref.where, pool.ref)::$rt
×
142
            else
143
                return ($func_local)(pool)
×
144
            end
145
        end
146

147
        # default impl
148
        ($func)(pool::AbstractWorkerPool) = ($func_local)(pool)
×
149
    end
150
end
151

152
for func = (:push!, :put!)
153
    func_local = Symbol(string("wp_local_", func))
154
    @eval begin
155
        function ($func)(pool::WorkerPool, w::Int)
43✔
156
            if pool.ref.where != myid()
43✔
157
                return remotecall_fetch((ref, w)->($func_local)(fetch(ref).value, w), pool.ref.where, pool.ref, w)
×
158
            else
159
                return ($func_local)(pool, w)
43✔
160
            end
161
        end
162

163
        # default impl
164
        ($func)(pool::AbstractWorkerPool, w::Int) = ($func_local)(pool, w)
×
165
    end
166
end
167

168

169
"""
170
    remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
171

172
[`WorkerPool`](@ref) variant of `remotecall(f, pid, ....)`. Wait for and take a free worker from `pool` and perform a `remotecall` on it.
173

174
# Examples
175
```julia-repl
176
\$ julia -p 3
177

178
julia> wp = WorkerPool([2, 3]);
179

180
julia> A = rand(3000);
181

182
julia> f = remotecall(maximum, wp, A)
183
Future(2, 1, 6, nothing)
184
```
185
In this example, the task ran on pid 2, called from pid 1.
186
"""
187
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall, f, pool, args...; kwargs...)
×
188

189

190
"""
191
    remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
192

193
[`WorkerPool`](@ref) variant of `remotecall_wait(f, pid, ....)`. Wait for and take a free worker from `pool` and
194
perform a `remotecall_wait` on it.
195

196
# Examples
197
```julia-repl
198
\$ julia -p 3
199

200
julia> wp = WorkerPool([2, 3]);
201

202
julia> A = rand(3000);
203

204
julia> f = remotecall_wait(maximum, wp, A)
205
Future(3, 1, 9, nothing)
206

207
julia> fetch(f)
208
0.9995177101692958
209
```
210
"""
211
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_wait, f, pool, args...; kwargs...)
×
212

213

214
"""
215
    remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
216

217
[`WorkerPool`](@ref) variant of `remotecall_fetch(f, pid, ....)`. Waits for and takes a free worker from `pool` and
218
performs a `remotecall_fetch` on it.
219

220
# Examples
221
```julia-repl
222
\$ julia -p 3
223

224
julia> wp = WorkerPool([2, 3]);
225

226
julia> A = rand(3000);
227

228
julia> remotecall_fetch(maximum, wp, A)
229
0.9995177101692958
230
```
231
"""
232
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_fetch, f, pool, args...; kwargs...)
×
233

234
"""
235
    remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
236

237
[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
238
perform a `remote_do` on it.
239
"""
240
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)
×
241

242
const _default_worker_pool = Ref{Union{AbstractWorkerPool, Nothing}}(nothing)
243

244
"""
245
    default_worker_pool()
246

247
[`AbstractWorkerPool`](@ref) containing idle [`workers`](@ref) - used by `remote(f)` and [`pmap`](@ref)
248
(by default). Unless one is explicitly set via `default_worker_pool!(pool)`, the default worker pool is
249
initialized to a [`WorkerPool`](@ref).
250

251
# Examples
252
```julia-repl
253
\$ julia -p 3
254

255
julia> default_worker_pool()
256
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
257
```
258
"""
259
function default_worker_pool()
43✔
260
    # On workers retrieve the default worker pool from the master when accessed
261
    # for the first time
262
    if _default_worker_pool[] === nothing
43✔
263
        if myid() == 1
3✔
264
            _default_worker_pool[] = WorkerPool()
3✔
265
        else
266
            _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1)
×
267
        end
268
    end
269
    return _default_worker_pool[]
43✔
270
end
271

272
"""
273
    default_worker_pool!(pool::AbstractWorkerPool)
274

275
Set a [`AbstractWorkerPool`](@ref) to be used by `remote(f)` and [`pmap`](@ref) (by default).
276
"""
277
function default_worker_pool!(pool::AbstractWorkerPool)
×
278
    _default_worker_pool[] = pool
×
279
end
280

281
"""
282
    remote([p::AbstractWorkerPool], f) -> Function
283

284
Return an anonymous function that executes function `f` on an available worker
285
(drawn from [`WorkerPool`](@ref) `p` if provided) using [`remotecall_fetch`](@ref).
286
"""
287
remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...)
×
288
remote(p::AbstractWorkerPool, f) = (args...; kwargs...)->remotecall_fetch(f, p, args...; kwargs...)
×
289

290
mutable struct CachingPool <: AbstractWorkerPool
291
    channel::Channel{Int}
292
    workers::Set{Int}
293

294
    # Mapping between a tuple (worker_id, f) and a RemoteChannel
295
    map_obj2ref::IdDict{Tuple{Int, Function}, RemoteChannel}
296

297
    function CachingPool()
×
298
        wp = new(Channel{Int}(typemax(Int)), Set{Int}(), IdDict{Tuple{Int, Function}, RemoteChannel}())
×
299
        finalizer(clear!, wp)
×
300
        wp
×
301
    end
302
end
303

304
serialize(s::AbstractSerializer, cp::CachingPool) = throw(ErrorException("CachingPool objects are not serializable."))
×
305

306
"""
307
    CachingPool(workers::Vector{Int})
308

309
An implementation of an `AbstractWorkerPool`.
310
[`remote`](@ref), [`remotecall_fetch`](@ref),
311
[`pmap`](@ref) (and other remote calls which execute functions remotely)
312
benefit from caching the serialized/deserialized functions on the worker nodes,
313
especially closures (which may capture large amounts of data).
314

315
The remote cache is maintained for the lifetime of the returned `CachingPool` object.
316
To clear the cache earlier, use `clear!(pool)`.
317

318
For global variables, only the bindings are captured in a closure, not the data.
319
`let` blocks can be used to capture global data.
320

321
# Examples
322
```julia
323
const foo = rand(10^8);
324
wp = CachingPool(workers())
325
let foo = foo
326
    pmap(i -> sum(foo) + i, wp, 1:100);
327
end
328
```
329

330
The above would transfer `foo` only once to each worker.
331

332
"""
333
function CachingPool(workers::Vector{Int})
×
334
    pool = CachingPool()
×
335
    for w in workers
×
336
        push!(pool, w)
×
337
    end
×
338
    return pool
×
339
end
340

341
"""
342
    clear!(pool::CachingPool) -> pool
343

344
Removes all cached functions from all participating workers.
345
"""
346
function clear!(pool::CachingPool)
×
347
    for (_,rr) in pool.map_obj2ref
×
348
        finalize(rr)
×
349
    end
×
350
    empty!(pool.map_obj2ref)
×
351
    pool
×
352
end
353

354
exec_from_cache(rr::RemoteChannel, args...; kwargs...) = fetch(rr)(args...; kwargs...)
×
355
function exec_from_cache(f_ref::Tuple{Function, RemoteChannel}, args...; kwargs...)
×
356
    put!(f_ref[2], f_ref[1])        # Cache locally
×
357
    f_ref[1](args...; kwargs...)
×
358
end
359

360
function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
×
361
    worker = take!(pool)
×
362
    f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
×
363
    isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2])   # Add to tracker
×
364

365
    try
×
366
        rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
×
367
    finally
368
        put!(pool, worker)
×
369
    end
370
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc