• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37604

26 Aug 2023 05:06AM UTC coverage: 86.533% (+0.1%) from 86.402%
#37604

push

local

web-flow
allow `@overlay` for methods with return type declaration (#51054)

12 of 12 new or added lines in 1 file covered. (100.0%)

73399 of 84822 relevant lines covered (86.53%)

12701637.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.07
/stdlib/Distributed/src/process_messages.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
# data stored by the owner of a remote reference
4
def_rv_channel() = Channel(1)
115,572✔
5
mutable struct RemoteValue
6
    c::AbstractChannel
7
    clientset::BitSet # Set of workerids that have a reference to this channel.
8
                      # Keeping ids instead of a count aids in cleaning up upon
9
                      # a worker exit.
10

11
    waitingfor::Int   # processor we need to hear from to fill this, or 0
12

13
    synctake::Union{ReentrantLock, Nothing}  # A lock used to synchronize the
14
                      # specific case of a local put! / remote take! on an
15
                      # unbuffered store. github issue #29932
16

17
    function RemoteValue(c)
115,579✔
18
        c_is_buffered = false
2✔
19
        try
115,579✔
20
            c_is_buffered = isbuffered(c)
231,156✔
21
        catch
×
22
        end
23

24
        if c_is_buffered
115,579✔
25
            return new(c, BitSet(), 0, nothing)
115,577✔
26
        else
27
            return new(c, BitSet(), 0, ReentrantLock())
2✔
28
        end
29
    end
30
end
31

32
wait(rv::RemoteValue) = wait(rv.c)
113✔
33

34
# A wrapper type to handle issue #29932 which requires locking / unlocking of
35
# RemoteValue.synctake outside of lexical scope.
36
struct SyncTake
37
    v::Any
10✔
38
    rv::RemoteValue
39
end
40

41
## core messages: do, call, fetch, wait, ref, put! ##
42
struct RemoteException <: Exception
43
    pid::Int
92✔
44
    captured::CapturedException
45
end
46

47
"""
48
    capture_exception(ex::RemoteException, bt)
49

50
Returns `ex::RemoteException` which has already captured a backtrace (via it's [`CapturedException`](@ref) field `captured`).
51
"""
52
Base.capture_exception(ex::RemoteException, bt) = ex
2✔
53

54
"""
55
    RemoteException(captured)
56

57
Exceptions on remote computations are captured and rethrown locally.  A `RemoteException`
58
wraps the `pid` of the worker and a captured exception. A `CapturedException` captures the
59
remote exception and a serializable form of the call stack when the exception was raised.
60
"""
61
RemoteException(captured) = RemoteException(myid(), captured)
86✔
62
function showerror(io::IO, re::RemoteException)
2✔
63
    (re.pid != myid()) && print(io, "On worker ", re.pid, ":\n")
2✔
64
    showerror(io, re.captured)
2✔
65
end
66

67
function run_work_thunk(thunk::Function, print_error::Bool)
116,606✔
68
    local result
116,606✔
69
    try
116,606✔
70
        result = thunk()
116,692✔
71
    catch err
72
        ce = CapturedException(err, catch_backtrace())
86✔
73
        result = RemoteException(ce)
86✔
74
        print_error && showerror(stderr, ce)
86✔
75
    end
76
    return result
116,495✔
77
end
78
function run_work_thunk(rv::RemoteValue, thunk)
100,788✔
79
    put!(rv, run_work_thunk(thunk, false))
100,788✔
80
    nothing
100,788✔
81
end
82

83
function schedule_call(rid, thunk)
100,788✔
84
    return lock(client_refs) do
100,788✔
85
        rv = RemoteValue(def_rv_channel())
100,788✔
86
        (PGRP::ProcessGroup).refs[rid] = rv
100,788✔
87
        push!(rv.clientset, rid.whence)
100,788✔
88
        errormonitor(@async run_work_thunk(rv, thunk))
201,576✔
89
        return rv
100,788✔
90
    end
91
end
92

93

94
function deliver_result(sock::IO, msg, oid, value)
14,459✔
95
    #print("$(myid()) sending result $oid\n")
96
    if msg === :call_fetch || isa(value, RemoteException)
14,759✔
97
        val = value
14,159✔
98
    else
99
        val = :OK
300✔
100
    end
101
    try
14,459✔
102
        send_msg_now(sock, MsgHeader(oid), ResultMsg(val))
14,459✔
103
    catch e
104
        # terminate connection in case of serialization error
105
        # otherwise the reading end would hang
106
        @error "Fatal error on process $(myid())" exception=e,catch_backtrace()
×
107
        wid = worker_id_from_socket(sock)
×
108
        close(sock)
×
109
        if myid()==1
×
110
            rmprocs(wid)
×
111
        elseif wid == 1
×
112
            exit(1)
×
113
        else
114
            remote_do(rmprocs, 1, wid)
×
115
        end
116
    end
117
end
118

119
## message event handlers ##
120
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
353✔
121
    errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
706✔
122
end
123

124
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
353✔
125
    Sockets.nagle(r_stream, false)
353✔
126
    Sockets.quickack(r_stream, true)
353✔
127
    wait_connected(r_stream)
353✔
128
    if r_stream != w_stream
353✔
129
        Sockets.nagle(w_stream, false)
×
130
        Sockets.quickack(w_stream, true)
×
131
        wait_connected(w_stream)
×
132
    end
133
    message_handler_loop(r_stream, w_stream, incoming)
353✔
134
end
135

136
"""
137
    process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
138

139
Called by cluster managers using custom transports. It should be called when the custom
140
transport implementation receives the first message from a remote worker. The custom
141
transport must manage a logical connection to the remote worker and provide two
142
`IO` objects, one for incoming messages and the other for messages addressed to the
143
remote worker.
144
If `incoming` is `true`, the remote peer initiated the connection.
145
Whichever of the pair initiates the connection sends the cluster cookie and its
146
Julia version number to perform the authentication handshake.
147

148
See also [`cluster_cookie`](@ref).
149
"""
150
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
×
151
    errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
×
152
end
153

154
function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
353✔
155
    wpid=0          # the worker r_stream is connected to.
353✔
156
    boundary = similar(MSG_BOUNDARY)
353✔
157
    try
353✔
158
        version = process_hdr(r_stream, incoming)
353✔
159
        serializer = ClusterSerializer(r_stream)
353✔
160

161
        # The first message will associate wpid with r_stream
162
        header = deserialize_hdr_raw(r_stream)
353✔
163
        msg = deserialize_msg(serializer)
353✔
164
        handle_msg(msg, header, r_stream, w_stream, version)
353✔
165
        wpid = worker_id_from_socket(r_stream)
353✔
166
        @assert wpid > 0
353✔
167

168
        readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
353✔
169

170
        while true
31,209✔
171
            reset_state(serializer)
31,209✔
172
            header = deserialize_hdr_raw(r_stream)
31,209✔
173
            # println("header: ", header)
174

175
            try
30,856✔
176
                msg = invokelatest(deserialize_msg, serializer)
30,862✔
177
            catch e
178
                # Deserialization error; discard bytes in stream until boundary found
179
                boundary_idx = 1
6✔
180
                while true
437✔
181
                    # This may throw an EOF error if the terminal boundary was not written
182
                    # correctly, triggering the higher-scoped catch block below
183
                    byte = read(r_stream, UInt8)
437✔
184
                    if byte == MSG_BOUNDARY[boundary_idx]
437✔
185
                        boundary_idx += 1
60✔
186
                        if boundary_idx > length(MSG_BOUNDARY)
60✔
187
                            break
60✔
188
                        end
189
                    else
190
                        boundary_idx = 1
377✔
191
                    end
192
                end
431✔
193

194
                # remotecalls only rethrow RemoteExceptions. Any other exception is treated as
195
                # data to be returned. Wrap this exception in a RemoteException.
196
                remote_err = RemoteException(myid(), CapturedException(e, catch_backtrace()))
6✔
197
                # println("Deserialization error. ", remote_err)
198
                if !null_id(header.response_oid)
7✔
199
                    ref = lookup_ref(header.response_oid)
1✔
200
                    put!(ref, remote_err)
1✔
201
                end
202
                if !null_id(header.notify_oid)
11✔
203
                    deliver_result(w_stream, :call_fetch, header.notify_oid, remote_err)
5✔
204
                end
205
                continue
6✔
206
            end
207
            readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
30,850✔
208

209
            # println("got msg: ", typeof(msg))
210
            handle_msg(msg, header, r_stream, w_stream, version)
30,850✔
211
        end
30,957✔
212
    catch e
213
        werr = worker_from_id(wpid)
101✔
214
        oldstate = werr.state
101✔
215

216
        # Check again as it may have been set in a message handler but not propagated to the calling block above
217
        if wpid < 1
101✔
218
            wpid = worker_id_from_socket(r_stream)
×
219
        end
220

221
        if wpid < 1
101✔
222
            println(stderr, e, CapturedException(e, catch_backtrace()))
×
223
            println(stderr, "Process($(myid())) - Unknown remote, closing connection.")
×
224
        elseif !(wpid in map_del_wrkr)
101✔
225
            set_worker_state(werr, W_TERMINATED)
202✔
226

227
            # If unhandleable error occurred talking to pid 1, exit
228
            if wpid == 1
101✔
229
                if isopen(w_stream)
×
230
                    @error "Fatal error on process $(myid())" exception=e,catch_backtrace()
×
231
                end
232
                exit(1)
×
233
            end
234

235
            # Will treat any exception as death of node and cleanup
236
            # since currently we do not have a mechanism for workers to reconnect
237
            # to each other on unhandled errors
238
            deregister_worker(wpid)
101✔
239
        end
240

241
        close(r_stream)
101✔
242
        close(w_stream)
101✔
243

244
        if (myid() == 1) && (wpid > 1)
101✔
245
            if oldstate != W_TERMINATING
89✔
246
                println(stderr, "Worker $wpid terminated.")
×
247
                rethrow()
×
248
            end
249
        end
250

251
        return nothing
101✔
252
    end
253
end
254

255
function process_hdr(s, validate_cookie)
353✔
256
    if validate_cookie
353✔
257
        cookie = read(s, HDR_COOKIE_LEN)
352✔
258
        if length(cookie) < HDR_COOKIE_LEN
176✔
259
            error("Cookie read failed. Connection closed by peer.")
×
260
        end
261

262
        self_cookie = cluster_cookie()
176✔
263
        for i in 1:HDR_COOKIE_LEN
176✔
264
            if UInt8(self_cookie[i]) != cookie[i]
2,816✔
265
                error("Process($(myid())) - Invalid connection credentials sent by remote.")
×
266
            end
267
        end
5,456✔
268
    end
269

270
    # When we have incompatible julia versions trying to connect to each other,
271
    # and can be detected, raise an appropriate error.
272
    # For now, just return the version.
273
    version = read(s, HDR_VERSION_LEN)
706✔
274
    if length(version) < HDR_VERSION_LEN
353✔
275
        error("Version read failed. Connection closed by peer.")
×
276
    end
277

278
    return VersionNumber(strip(String(version)))
353✔
279
end
280

281
function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
446✔
282
    schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
892✔
283
end
284
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
14,150✔
285
    errormonitor(@async begin
28,300✔
286
        v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
28,300✔
287
        if isa(v, SyncTake)
14,150✔
288
            try
10✔
289
                deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
10✔
290
            finally
291
                unlock(v.rv.synctake)
20✔
292
            end
293
        else
294
            deliver_result(w_stream, :call_fetch, header.notify_oid, v)
14,140✔
295
        end
296
        nothing
14,150✔
297
    end)
298
end
299

300
function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
304✔
301
    errormonitor(@async begin
608✔
302
        rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
608✔
303
        deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
304✔
304
        nothing
304✔
305
    end)
306
end
307

308
function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
1,491✔
309
    errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
4,473✔
310
end
311

312
function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
14,459✔
313
    put!(lookup_ref(header.response_oid), msg.value)
14,459✔
314
end
315

316
function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
64✔
317
    # register a new peer worker connection
318
    w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
64✔
319
    send_connection_hdr(w, false)
64✔
320
    send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
64✔
321
    notify(w.initialized)
64✔
322
end
323

324
function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version)
64✔
325
    w = map_sock_wrkr[r_stream]
64✔
326
    w.version = version
64✔
327
end
328

329
function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
112✔
330
    LPROC.id = msg.self_pid
112✔
331
    controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
112✔
332
    notify(controller.initialized)
112✔
333
    register_worker(LPROC)
112✔
334
    topology(msg.topology)
112✔
335

336
    if !msg.enable_threaded_blas
112✔
337
        Base.disable_library_threading()
110✔
338
    end
339

340
    lazy = msg.lazy
112✔
341
    PGRP.lazy = lazy
112✔
342

343
    @sync for (connect_at, rpid) in msg.other_workers
191✔
344
        wconfig = WorkerConfig()
12,120✔
345
        wconfig.connect_at = connect_at
606✔
346

347
        let rpid=rpid, wconfig=wconfig
606✔
348
            if lazy
606✔
349
                # The constructor registers the object with a global registry.
350
                Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
578✔
351
            else
352
                @async connect_to_peer(cluster_manager, rpid, wconfig)
92✔
353
            end
354
        end
355
    end
×
356

357
    send_connection_hdr(controller, false)
112✔
358
    send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
112✔
359
end
360

361
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
64✔
362
    try
64✔
363
        (r_s, w_s) = connect(manager, rpid, wconfig)
64✔
364
        w = Worker(rpid, r_s, w_s, manager; config=wconfig)
64✔
365
        process_messages(w.r_stream, w.w_stream, false)
64✔
366
        send_connection_hdr(w, true)
64✔
367
        send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
64✔
368
        notify(w.initialized)
64✔
369
    catch e
370
        @error "Error on $(myid()) while connecting to peer $rpid, exiting" exception=e,catch_backtrace()
×
371
        exit(1)
×
372
    end
373
end
374

375
function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version)
113✔
376
    w = map_sock_wrkr[r_stream]
113✔
377
    environ = something(w.config.environ, Dict())
113✔
378
    environ[:cpu_threads] = msg.cpu_threads
113✔
379
    w.config.environ = environ
113✔
380
    w.config.ospid = msg.ospid
113✔
381
    w.version = version
113✔
382

383
    ntfy_channel = lookup_ref(header.notify_oid)
113✔
384
    put!(ntfy_channel, w.id)
113✔
385

386
    push!(default_worker_pool(), w.id)
113✔
387
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc