• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37524

pending completion
#37524

push

local

web-flow
Fix dyld lock not getting unlocked on invalid threads. (#49446)

70720 of 81817 relevant lines covered (86.44%)

34557294.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.67
/stdlib/Distributed/src/remotecall.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
import Base: eltype
4

5
abstract type AbstractRemoteRef end
6

7
"""
8
    client_refs
9

10
Tracks whether a particular `AbstractRemoteRef`
11
(identified by its RRID) exists on this worker.
12

13
The `client_refs` lock is also used to synchronize access to `.refs` and associated `clientset` state.
14
"""
15
const client_refs = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet
16

17
"""
18
    Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
19

20
A `Future` is a placeholder for a single computation
21
of unknown termination status and time.
22
For multiple potential computations, see `RemoteChannel`.
23
See `remoteref_id` for identifying an `AbstractRemoteRef`.
24
"""
25
mutable struct Future <: AbstractRemoteRef
26
    where::Int
27
    whence::Int
28
    id::Int
29
    lock::ReentrantLock
30
    @atomic v::Union{Some{Any}, Nothing}
31

32
    Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
201,591✔
33
        (r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r))
100,803✔
34

35
    Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4])  # Useful for creating dummy, zeroed-out instances
1✔
36
end
37

38
"""
39
    RemoteChannel(pid::Integer=myid())
40

41
Make a reference to a `Channel{Any}(1)` on process `pid`.
42
The default `pid` is the current process.
43

44
    RemoteChannel(f::Function, pid::Integer=myid())
45

46
Create references to remote channels of a specific size and type. `f` is a function that
47
when executed on `pid` must return an implementation of an `AbstractChannel`.
48

49
For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a
50
channel of type `Int` and size 10 on `pid`.
51

52
The default `pid` is the current process.
53
"""
54
mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
55
    where::Int
56
    whence::Int
57
    id::Int
58

59
    function RemoteChannel{T}(w::Int, rrid::RRID) where T<:AbstractChannel
1,067✔
60
        r = new(w, rrid.whence, rrid.id)
1,067✔
61
        return test_existing_ref(r)
1,067✔
62
    end
63

64
    function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel
1✔
65
        return new(t[1],t[2],t[3])
1✔
66
    end
67
end
68

69
function test_existing_ref(r::AbstractRemoteRef)
101,870✔
70
    found = getkey(client_refs, r, nothing)
102,673✔
71
    if found !== nothing
101,870✔
72
        @assert r.where > 0
803✔
73
        if isa(r, Future)
803✔
74
            # this is only for copying the reference from Future to RemoteRef (just created)
75
            fv_cache = @atomic :acquire found.v
×
76
            rv_cache = @atomic :monotonic r.v
×
77
            if fv_cache === nothing && rv_cache !== nothing
×
78
                # we have recd the value from another source, probably a deserialized ref, send a del_client message
79
                send_del_client(r)
×
80
                @lock found.lock begin
×
81
                    @atomicreplace found.v nothing => rv_cache
×
82
                end
83
            end
84
        end
85
        return found::typeof(r)
803✔
86
    end
87

88
    client_refs[r] = nothing
101,067✔
89
    finalizer(finalize_ref, r)
101,067✔
90
    return r
101,067✔
91
end
92

93
function finalize_ref(r::AbstractRemoteRef)
100,937✔
94
    if r.where > 0 # Handle the case of the finalizer having been called manually
100,937✔
95
        if trylock(client_refs.lock) # trylock doesn't call wait which causes yields
201,874✔
96
            try
100,937✔
97
                delete!(client_refs.ht, r) # direct removal avoiding locks
100,937✔
98
                if isa(r, RemoteChannel)
244✔
99
                    send_del_client_no_lock(r)
460✔
100
                else
101
                    # send_del_client only if the reference has not been set
102
                    v_cache = @atomic :monotonic r.v
100,693✔
103
                    v_cache === nothing && send_del_client_no_lock(r)
101,295✔
104
                    @atomic :monotonic r.v = nothing
100,693✔
105
                end
106
                r.where = 0
100,937✔
107
            finally
108
                unlock(client_refs.lock)
201,874✔
109
            end
110
        else
111
            finalizer(finalize_ref, r)
×
112
            return nothing
×
113
        end
114
    end
115
    nothing
100,937✔
116
end
117

118
"""
119
    Future(pid::Integer=myid())
120

121
Create a `Future` on process `pid`.
122
The default `pid` is the current process.
123
"""
124
Future(pid::Integer=myid()) = Future(pid, RRID())
100,794✔
125
Future(w::LocalProcess) = Future(w.id)
100,036✔
126
Future(w::Worker) = Future(w.id)
735✔
127

128
RemoteChannel(pid::Integer=myid()) = RemoteChannel{Channel{Any}}(pid, RRID())
257✔
129

130
function RemoteChannel(f::Function, pid::Integer=myid())
13✔
131
    remotecall_fetch(pid, f, RRID()) do f, rrid
13✔
132
        rv=lookup_ref(rrid, f)
7✔
133
        RemoteChannel{typeof(rv.c)}(myid(), rrid)
7✔
134
    end
135
end
136

137
Base.eltype(::Type{RemoteChannel{T}}) where {T} = eltype(T)
856✔
138

139
hash(r::AbstractRemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
419,260✔
140
==(r::AbstractRemoteRef, s::AbstractRemoteRef) = (r.whence==s.whence && r.id==s.id)
1,724✔
141

142
"""
143
    remoteref_id(r::AbstractRemoteRef) -> RRID
144

145
`Future`s and `RemoteChannel`s are identified by fields:
146

147
* `where` - refers to the node where the underlying object/storage
148
  referred to by the reference actually exists.
149

150
* `whence` - refers to the node the remote reference was created from.
151
  Note that this is different from the node where the underlying object
152
  referred to actually exists. For example calling `RemoteChannel(2)`
153
  from the master process would result in a `where` value of 2 and
154
  a `whence` value of 1.
155

156
* `id` is unique across all references created from the worker specified by `whence`.
157

158
Taken together,  `whence` and `id` uniquely identify a reference across all workers.
159

160
`remoteref_id` is a low-level API which returns a `RRID`
161
object that wraps `whence` and `id` values of a remote reference.
162
"""
163
remoteref_id(r::AbstractRemoteRef) = RRID(r.whence, r.id)
304,608✔
164

165
"""
166
    channel_from_id(id) -> c
167

168
A low-level API which returns the backing `AbstractChannel` for an `id` returned by
169
[`remoteref_id`](@ref).
170
The call is valid only on the node where the backing channel exists.
171
"""
172
function channel_from_id(id)
166✔
173
    rv = lock(client_refs) do
166✔
174
        return get(PGRP.refs, id, false)
332✔
175
    end
176
    if rv === false
166✔
177
        throw(ErrorException("Local instance of remote reference not found"))
×
178
    end
179
    return rv.c
166✔
180
end
181

182
lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f)
264,629✔
183
function lookup_ref(pg, rrid, f)
132,318✔
184
    return lock(client_refs) do
132,318✔
185
        rv = get(pg.refs, rrid, false)
249,575✔
186
        if rv === false
132,318✔
187
            # first we've heard of this ref
188
            rv = RemoteValue(invokelatest(f))
15,061✔
189
            pg.refs[rrid] = rv
15,061✔
190
            push!(rv.clientset, rrid.whence)
15,061✔
191
        end
192
        return rv
132,318✔
193
    end::RemoteValue
194
end
195

196
"""
197
    isready(rr::Future)
198

199
Determine whether a [`Future`](@ref) has a value stored to it.
200

201
If the argument `Future` is owned by a different node, this call will block to wait for the answer.
202
It is recommended to wait for `rr` in a separate task instead
203
or to use a local [`Channel`](@ref) as a proxy:
204

205
```julia
206
p = 1
207
f = Future(p)
208
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
209
isready(f)  # will not block
210
```
211
"""
212
function isready(rr::Future)
5✔
213
    v_cache = @atomic rr.v
5✔
214
    v_cache === nothing || return true
7✔
215

216
    rid = remoteref_id(rr)
3✔
217
    return if rr.where == myid()
3✔
218
        isready(lookup_ref(rid).c)
1✔
219
    else
220
        remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid)
4✔
221
    end
222
end
223

224
"""
225
    isready(rr::RemoteChannel, args...)
226

227
Determine whether a [`RemoteChannel`](@ref) has a value stored to it.
228
Note that this function can cause race conditions, since by the
229
time you receive its result it may no longer be true. However,
230
it can be safely used on a [`Future`](@ref) since they are assigned only once.
231
"""
232
function isready(rr::RemoteChannel, args...)
5✔
233
    rid = remoteref_id(rr)
5✔
234
    return if rr.where == myid()
5✔
235
        isready(lookup_ref(rid).c, args...)
3✔
236
    else
237
        remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid)
4✔
238
    end
239
end
240

241
del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
100,016✔
242

243
del_client(id, client) = del_client(PGRP, id, client)
100,901✔
244
function del_client(pg, id, client)
876✔
245
    lock(client_refs) do
100,907✔
246
        _del_client(pg, id, client)
100,907✔
247
    end
248
    nothing
876✔
249
end
250

251
function _del_client(pg, id, client)
100,964✔
252
    rv = get(pg.refs, id, false)
201,720✔
253
    if rv !== false
100,964✔
254
        delete!(rv.clientset, client)
100,756✔
255
        if isempty(rv.clientset)
100,756✔
256
            delete!(pg.refs, id)
100,719✔
257
            #print("$(myid()) collected $id\n")
258
        end
259
    end
260
    nothing
100,964✔
261
end
262

263
function del_clients(pairs::Vector)
402✔
264
    for p in pairs
402✔
265
        del_client(p[1], p[2])
870✔
266
    end
870✔
267
end
268

269
# The task below is coalescing the `flush_gc_msgs` call
270
# across multiple producers, see `send_del_client`,
271
# and `send_add_client`.
272
# XXX: Is this worth the additional complexity?
273
#      `flush_gc_msgs` has to iterate over all connected workers.
274
const any_gc_flag = Threads.Condition()
275
function start_gc_msgs_task()
129✔
276
    errormonitor(
129✔
277
        Threads.@spawn begin
278
            while true
403✔
279
                lock(any_gc_flag) do
406✔
280
                    # this might miss events
281
                    wait(any_gc_flag)
406✔
282
                end
283
                # Use invokelatest() so that custom message transport streams
284
                # for workers can be defined in a newer world age than the Task
285
                # which runs the loop here.
286
                invokelatest(flush_gc_msgs) # handles throws internally
278✔
287
            end
277✔
288
        end
289
    )
290
end
291

292
# Function can be called within a finalizer
293
function send_del_client(rr)
100,064✔
294
    if rr.where == myid()
100,064✔
295
        del_client(rr)
100,016✔
296
    elseif id_in_procs(rr.where) # process only if a valid worker
48✔
297
        process_worker(rr)
48✔
298
    end
299
end
300

301
function send_del_client_no_lock(rr)
244✔
302
    # for gc context to avoid yields
303
    if rr.where == myid()
875✔
304
        _del_client(PGRP, remoteref_id(rr), myid())
57✔
305
    elseif id_in_procs(rr.where) # process only if a valid worker
818✔
306
        process_worker(rr)
807✔
307
    end
308
end
309

310
function publish_del_msg!(w::Worker, msg)
855✔
311
    lock(w.msg_lock) do
855✔
312
        push!(w.del_msgs, msg)
855✔
313
        @atomic w.gcflag = true
855✔
314
    end
315
    lock(any_gc_flag) do
855✔
316
        notify(any_gc_flag)
855✔
317
    end
318
end
319

320
function process_worker(rr)
855✔
321
    w = worker_from_id(rr.where)::Worker
855✔
322
    msg = (remoteref_id(rr), myid())
855✔
323

324
    # Needs to acquire a lock on the del_msg queue
325
    T = Threads.@spawn begin
855✔
326
        publish_del_msg!($w, $msg)
855✔
327
    end
328
    Base.errormonitor(T)
855✔
329

330
    return
855✔
331
end
332

333
function add_client(id, client)
840✔
334
    lock(client_refs) do
840✔
335
        rv = lookup_ref(id)
840✔
336
        push!(rv.clientset, client)
840✔
337
    end
338
    nothing
840✔
339
end
340

341
function add_clients(pairs::Vector)
4✔
342
    for p in pairs
4✔
343
        add_client(p[1], p[2]...)
4✔
344
    end
4✔
345
end
346

347
function send_add_client(rr::AbstractRemoteRef, i)
18✔
348
    if rr.where == myid()
18✔
349
        add_client(remoteref_id(rr), i)
14✔
350
    elseif (i != rr.where) && id_in_procs(rr.where)
4✔
351
        # don't need to send add_client if the message is already going
352
        # to the processor that owns the remote ref. it will add_client
353
        # itself inside deserialize().
354
        w = worker_from_id(rr.where)
4✔
355
        lock(w.msg_lock) do
4✔
356
            push!(w.add_msgs, (remoteref_id(rr), i))
4✔
357
            @atomic w.gcflag = true
4✔
358
        end
359
        lock(any_gc_flag) do
4✔
360
            notify(any_gc_flag)
4✔
361
        end
362
    end
363
end
364

365
channel_type(rr::RemoteChannel{T}) where {T} = T
826✔
366

367
function serialize(s::ClusterSerializer, f::Future)
15✔
368
    v_cache = @atomic f.v
15✔
369
    if v_cache === nothing
15✔
370
        p = worker_id_from_socket(s.io)
14✔
371
        (p !== f.where) && send_add_client(f, p)
14✔
372
    end
373
    invoke(serialize, Tuple{ClusterSerializer, Any}, s, f)
15✔
374
end
375

376
function serialize(s::ClusterSerializer, rr::RemoteChannel)
826✔
377
    p = worker_id_from_socket(s.io)
826✔
378
    (p !== rr.where) && send_add_client(rr, p)
826✔
379
    invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
826✔
380
end
381

382
function deserialize(s::ClusterSerializer, t::Type{<:Future})
15✔
383
    fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
15✔
384
    f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table
15✔
385

386
    # 1) send_add_client() is not executed when the ref is being serialized
387
    #    to where it exists, hence do it here.
388
    # 2) If we have received a 'fetch'ed Future or if the Future ctor found an
389
    #    already 'fetch'ed instance in client_refs (Issue #25847), we should not
390
    #    track it in the backing RemoteValue store.
391
    f2v_cache = @atomic f2.v
15✔
392
    if f2.where == myid() && f2v_cache === nothing
15✔
393
        add_client(remoteref_id(f2), myid())
11✔
394
    end
395
    f2
15✔
396
end
397

398
function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel})
826✔
399
    rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
826✔
400
    if rr.where == myid()
826✔
401
        # send_add_client() is not executed when the ref is being
402
        # serialized to where it exists
403
        add_client(remoteref_id(rr), myid())
811✔
404
    end
405
    # call ctor to make sure this rr gets added to the client_refs table
406
    RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id))
826✔
407
end
408

409
# Future and RemoteChannel are serializable only in a running cluster.
410
# Serialize zeroed-out values to non ClusterSerializer objects
411
function serialize(s::AbstractSerializer, ::Future)
1✔
412
    zero_fut = Future((0,0,0,nothing))
1✔
413
    invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
1✔
414
end
415

416
function serialize(s::AbstractSerializer, ::RemoteChannel)
1✔
417
    zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
1✔
418
    invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
1✔
419
end
420

421

422
# make a thunk to call f on args in a way that simulates what would happen if
423
# the function were sent elsewhere
424
function local_remotecall_thunk(f, args, kwargs)
100,169✔
425
    return ()->invokelatest(f, args...; kwargs...)
200,338✔
426
end
427

428
function remotecall(f, w::LocalProcess, args...; kwargs...)
200,072✔
429
    rr = Future(w)
100,036✔
430
    schedule_call(remoteref_id(rr), local_remotecall_thunk(f, args, kwargs))
100,036✔
431
    return rr
100,036✔
432
end
433

434
function remotecall(f, w::Worker, args...; kwargs...)
862✔
435
    rr = Future(w)
431✔
436
    send_msg(w, MsgHeader(remoteref_id(rr)), CallMsg{:call}(f, args, kwargs))
431✔
437
    return rr
431✔
438
end
439

440
"""
441
    remotecall(f, id::Integer, args...; kwargs...) -> Future
442

443
Call a function `f` asynchronously on the given arguments on the specified process.
444
Return a [`Future`](@ref).
445
Keyword arguments, if any, are passed through to `f`.
446
"""
447
remotecall(f, id::Integer, args...; kwargs...) = remotecall(f, worker_from_id(id), args...; kwargs...)
200,906✔
448

449
function remotecall_fetch(f, w::LocalProcess, args...; kwargs...)
198✔
450
    v=run_work_thunk(local_remotecall_thunk(f,args, kwargs), false)
99✔
451
    return isa(v, RemoteException) ? throw(v) : v
99✔
452
end
453

454
function remotecall_fetch(f, w::Worker, args...; kwargs...)
29,168✔
455
    # can be weak, because the program will have no way to refer to the Ref
456
    # itself, it only gets the result.
457
    oid = RRID()
14,584✔
458
    rv = lookup_ref(oid)
14,584✔
459
    rv.waitingfor = w.id
14,584✔
460
    send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))
14,584✔
461
    v = take!(rv)
14,551✔
462
    lock(client_refs) do
14,551✔
463
        delete!(PGRP.refs, oid)
14,551✔
464
    end
465
    return isa(v, RemoteException) ? throw(v) : v
14,551✔
466
end
467

468
"""
469
    remotecall_fetch(f, id::Integer, args...; kwargs...)
470

471
Perform `fetch(remotecall(...))` in one message.
472
Keyword arguments, if any, are passed through to `f`.
473
Any remote exceptions are captured in a
474
[`RemoteException`](@ref) and thrown.
475

476
See also [`fetch`](@ref) and [`remotecall`](@ref).
477

478
# Examples
479
```julia-repl
480
\$ julia -p 2
481

482
julia> remotecall_fetch(sqrt, 2, 4)
483
2.0
484

485
julia> remotecall_fetch(sqrt, 2, -4)
486
ERROR: On worker 2:
487
DomainError with -4.0:
488
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
489
...
490
```
491
"""
492
remotecall_fetch(f, id::Integer, args...; kwargs...) =
43,537✔
493
    remotecall_fetch(f, worker_from_id(id), args...; kwargs...)
494

495
remotecall_wait(f, w::LocalProcess, args...; kwargs...) = wait(remotecall(f, w, args...; kwargs...))
28✔
496

497
function remotecall_wait(f, w::Worker, args...; kwargs...)
608✔
498
    prid = RRID()
304✔
499
    rv = lookup_ref(prid)
304✔
500
    rv.waitingfor = w.id
304✔
501
    rr = Future(w)
304✔
502
    send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs))
304✔
503
    v = fetch(rv.c)
304✔
504
    lock(client_refs) do
304✔
505
        delete!(PGRP.refs, prid)
304✔
506
    end
507
    isa(v, RemoteException) && throw(v)
304✔
508
    return rr
300✔
509
end
510

511
"""
512
    remotecall_wait(f, id::Integer, args...; kwargs...)
513

514
Perform a faster `wait(remotecall(...))` in one message on the `Worker` specified by worker id `id`.
515
Keyword arguments, if any, are passed through to `f`.
516

517
See also [`wait`](@ref) and [`remotecall`](@ref).
518
"""
519
remotecall_wait(f, id::Integer, args...; kwargs...) =
1,215✔
520
    remotecall_wait(f, worker_from_id(id), args...; kwargs...)
521

522
function remote_do(f, w::LocalProcess, args...; kwargs...)
68✔
523
    # the LocalProcess version just performs in local memory what a worker
524
    # does when it gets a :do message.
525
    # same for other messages on LocalProcess.
526
    thk = local_remotecall_thunk(f, args, kwargs)
34✔
527
    schedule(Task(thk))
34✔
528
    nothing
34✔
529
end
530

531
function remote_do(f, w::Worker, args...; kwargs...)
2,946✔
532
    send_msg(w, MsgHeader(), RemoteDoMsg(f, args, kwargs))
1,473✔
533
    nothing
1,472✔
534
end
535

536

537
"""
538
    remote_do(f, id::Integer, args...; kwargs...) -> nothing
539

540
Executes `f` on worker `id` asynchronously.
541
Unlike [`remotecall`](@ref), it does not store the
542
result of computation, nor is there a way to wait for its completion.
543

544
A successful invocation indicates that the request has been accepted for execution on
545
the remote node.
546

547
While consecutive `remotecall`s to the same worker are serialized in the order they are
548
invoked, the order of executions on the remote worker is undetermined. For example,
549
`remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)` will serialize the call
550
to `f1`, followed by `f2` and `f3` in that order. However, it is not guaranteed that `f1`
551
is executed before `f3` on worker 2.
552

553
Any exceptions thrown by `f` are printed to [`stderr`](@ref) on the remote worker.
554

555
Keyword arguments, if any, are passed through to `f`.
556
"""
557
remote_do(f, id::Integer, args...; kwargs...) = remote_do(f, worker_from_id(id), args...; kwargs...)
4,348✔
558

559
# have the owner of rr call f on it
560
function call_on_owner(f, rr::AbstractRemoteRef, args...)
1,464✔
561
    rid = remoteref_id(rr)
1,464✔
562
    if rr.where == myid()
1,464✔
563
        f(rid, args...)
979✔
564
    else
565
        remotecall_fetch(f, rr.where, rid, args...)
517✔
566
    end
567
end
568

569
function wait_ref(rid, caller, args...)
442✔
570
    v = fetch_ref(rid, args...)
442✔
571
    if isa(v, RemoteException)
442✔
572
        if myid() == caller
3✔
573
            throw(v)
2✔
574
        else
575
            return v
1✔
576
        end
577
    end
578
    nothing
439✔
579
end
580

581
"""
582
    wait(r::Future)
583

584
Wait for a value to become available for the specified [`Future`](@ref).
585
"""
586
wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)
1,737✔
587

588
"""
589
    wait(r::RemoteChannel, args...)
590

591
Wait for a value to become available on the specified [`RemoteChannel`](@ref).
592
"""
593
wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r)
×
594

595
"""
596
    fetch(x::Future)
597

598
Wait for and get the value of a [`Future`](@ref). The fetched value is cached locally.
599
Further calls to `fetch` on the same reference return the cached value. If the remote value
600
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
601
"""
602
function fetch(r::Future)
100,077✔
603
    v_cache = @atomic r.v
100,077✔
604
    v_cache !== nothing && return something(v_cache)
100,077✔
605

606
    if r.where == myid()
100,069✔
607
        rv, v_cache = @lock r.lock begin
200,032✔
608
            v_cache = @atomic :monotonic r.v
100,016✔
609
            rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing
100,016✔
610
            rv, v_cache
100,016✔
611
        end
612

613
        if v_cache !== nothing
100,016✔
614
            return something(v_cache)
×
615
        else
616
            v_local = fetch(rv.c)
200,032✔
617
        end
618
    else
619
        v_local = call_on_owner(fetch_ref, r)
53✔
620
    end
621

622
    v_cache = @atomic r.v
100,064✔
623

624
    if v_cache === nothing # call_on_owner case
100,064✔
625
        v_old, status = @lock r.lock begin
200,107✔
626
            @atomicreplace r.v nothing => Some(v_local)
200,108✔
627
        end
628
        # status == true - when value obtained through call_on_owner
629
        # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
630
        # why? local put! performs caching and putting into channel under r.lock
631

632
        # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
633

634
        # remote calls getting the value from `call_on_owner` used to return the value directly without wrapping it in `Some(x)`
635
        # so we're doing the same thing here
636
        if status
100,054✔
637
            send_del_client(r)
100,093✔
638
            return v_local
100,054✔
639
        else # this `v_cache` is returned at the end of the function
640
            v_cache = v_old
×
641
        end
642
    end
643

644
    send_del_client(r)
19✔
645
    something(v_cache)
10✔
646
end
647

648
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
1,300✔
649

650
"""
651
    fetch(c::RemoteChannel)
652

653
Wait for and get a value from a [`RemoteChannel`](@ref). Exceptions raised are the
654
same as for a [`Future`](@ref). Does not remove the item fetched.
655
"""
656
fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...)::eltype(r)
819✔
657

658
isready(rv::RemoteValue, args...) = isready(rv.c, args...)
17✔
659

660
"""
661
    put!(rr::Future, v)
662

663
Store a value to a [`Future`](@ref) `rr`.
664
`Future`s are write-once remote references.
665
A `put!` on an already set `Future` throws an `Exception`.
666
All asynchronous remote calls return `Future`s and set the
667
value to the return value of the call upon completion.
668
"""
669
function put!(r::Future, v)
17✔
670
    if r.where == myid()
17✔
671
        rid = remoteref_id(r)
10✔
672
        rv = lookup_ref(rid)
10✔
673
        isready(rv) && error("Future can be set only once")
10✔
674
        @lock r.lock begin
20✔
675
            put!(rv, v) # this notifies the tasks waiting on the channel in fetch
10✔
676
            set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
11✔
677
        end
678
        del_client(rid, myid())
9✔
679
    else
680
        @lock r.lock begin # same idea as above if there were any local tasks fetching on this Future
7✔
681
            call_on_owner(put_future, r, v, myid())
7✔
682
            set_future_cache(r, v)
7✔
683
        end
684
    end
685
    r
14✔
686
end
687

688
function set_future_cache(r::Future, v)
16✔
689
    _, ok = @atomicreplace r.v nothing => Some(v)
16✔
690
    ok || error("internal consistency error detected for Future")
18✔
691
end
692

693
function put_future(rid, v, caller)
7✔
694
    rv = lookup_ref(rid)
7✔
695
    isready(rv) && error("Future can be set only once")
7✔
696
    put!(rv, v)
6✔
697
    # The caller has the value and hence can be removed from the remote store.
698
    del_client(rid, caller)
6✔
699
    nothing
6✔
700
end
701

702

703
put!(rv::RemoteValue, args...) = put!(rv.c, args...)
115,916✔
704
function put_ref(rid, caller, args...)
80✔
705
    rv = lookup_ref(rid)
80✔
706
    put!(rv, args...)
80✔
707
    if myid() == caller && rv.synctake !== nothing
79✔
708
        # Wait till a "taken" value is serialized out - github issue #29932
709
        lock(rv.synctake)
20✔
710
        unlock(rv.synctake)
20✔
711
    end
712
    nothing
79✔
713
end
714

715
"""
716
    put!(rr::RemoteChannel, args...)
717

718
Store a set of values to the [`RemoteChannel`](@ref).
719
If the channel is full, blocks until space is available.
720
Return the first argument.
721
"""
722
put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, myid(), args...); rr)
175✔
723

724
# take! is not supported on Future
725

726
take!(rv::RemoteValue, args...) = take!(rv.c, args...)
14,599✔
727
function take_ref(rid, caller, args...)
48✔
728
    rv = lookup_ref(rid)
48✔
729
    synctake = false
48✔
730
    if myid() != caller && rv.synctake !== nothing
48✔
731
        # special handling for local put! / remote take! on unbuffered channel
732
        # github issue #29932
733
        synctake = true
11✔
734
        lock(rv.synctake)
11✔
735
    end
736

737
    v = try
48✔
738
        take!(rv, args...)
50✔
739
    catch e
740
        # avoid unmatched unlock when exception occurs
741
        # github issue #33972
742
        synctake && unlock(rv.synctake)
3✔
743
        rethrow(e)
2✔
744
    end
745

746
    isa(v, RemoteException) && (myid() == caller) && throw(v)
46✔
747

748
    if synctake
46✔
749
        return SyncTake(v, rv)
10✔
750
    else
751
        return v
36✔
752
    end
753
end
754

755
"""
756
    take!(rr::RemoteChannel, args...)
757

758
Fetch value(s) from a [`RemoteChannel`](@ref) `rr`,
759
removing the value(s) in the process.
760
"""
761
take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...)::eltype(rr)
68✔
762

763
# close and isopen are not supported on Future
764

765
close_ref(rid) = (close(lookup_ref(rid).c); nothing)
10✔
766
close(rr::RemoteChannel) = call_on_owner(close_ref, rr)
6✔
767

768
isopen_ref(rid) = isopen(lookup_ref(rid).c)
31✔
769
isopen(rr::RemoteChannel) = call_on_owner(isopen_ref, rr)
42✔
770

771
getindex(r::RemoteChannel) = fetch(r)
3✔
772
getindex(r::Future) = fetch(r)
2✔
773

774
getindex(r::Future, args...) = getindex(fetch(r), args...)
2✔
775
function getindex(r::RemoteChannel, args...)
3✔
776
    if r.where == myid()
3✔
777
        return getindex(fetch(r), args...)
2✔
778
    end
779
    return remotecall_fetch(getindex, r.where, r, args...)
1✔
780
end
781

782
function iterate(c::RemoteChannel, state=nothing)
28✔
783
    if isopen(c) || isready(c)
42✔
784
        try
23✔
785
            return (take!(c), nothing)
23✔
786
        catch e
787
            if isa(e, InvalidStateException) ||
1✔
788
                (isa(e, RemoteException) &&
789
                isa(e.captured.ex, InvalidStateException) &&
790
                e.captured.ex.state === :closed)
791
                return nothing
1✔
792
            end
793
            rethrow()
×
794
        end
795
    else
796
        return nothing
2✔
797
    end
798
end
799

800
IteratorSize(::Type{<:RemoteChannel}) = SizeUnknown()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc