• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37584

pending completion
#37584

push

local

web-flow
relax assertion involving pg->nold to reflect that it may be a bit inaccurate with parallel marking (#50466)

70958 of 83746 relevant lines covered (84.73%)

23916169.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.5
/stdlib/Distributed/src/process_messages.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
# data stored by the owner of a remote reference
4
def_rv_channel() = Channel(1)
798✔
5
mutable struct RemoteValue
6
    c::AbstractChannel
7
    clientset::BitSet # Set of workerids that have a reference to this channel.
8
                      # Keeping ids instead of a count aids in cleaning up upon
9
                      # a worker exit.
10

11
    waitingfor::Int   # processor we need to hear from to fill this, or 0
12

13
    synctake::Union{ReentrantLock, Nothing}  # A lock used to synchronize the
14
                      # specific case of a local put! / remote take! on an
15
                      # unbuffered store. github issue #29932
16

17
    function RemoteValue(c)
798✔
18
        c_is_buffered = false
×
19
        try
798✔
20
            c_is_buffered = isbuffered(c)
1,596✔
21
        catch
×
22
        end
23

24
        if c_is_buffered
798✔
25
            return new(c, BitSet(), 0, nothing)
798✔
26
        else
27
            return new(c, BitSet(), 0, ReentrantLock())
×
28
        end
29
    end
30
end
31

32
wait(rv::RemoteValue) = wait(rv.c)
43✔
33

34
# A wrapper type to handle issue #29932 which requires locking / unlocking of
35
# RemoteValue.synctake outside of lexical scope.
36
struct SyncTake
37
    v::Any
38
    rv::RemoteValue
39
end
40

41
## core messages: do, call, fetch, wait, ref, put! ##
42
struct RemoteException <: Exception
43
    pid::Int
×
44
    captured::CapturedException
45
end
46

47
"""
48
    capture_exception(ex::RemoteException, bt)
49

50
Returns `ex::RemoteException` which has already captured a backtrace (via it's [`CapturedException`](@ref) field `captured`).
51
"""
52
Base.capture_exception(ex::RemoteException, bt) = ex
×
53

54
"""
55
    RemoteException(captured)
56

57
Exceptions on remote computations are captured and rethrown locally.  A `RemoteException`
58
wraps the `pid` of the worker and a captured exception. A `CapturedException` captures the
59
remote exception and a serializable form of the call stack when the exception was raised.
60
"""
61
RemoteException(captured) = RemoteException(myid(), captured)
×
62
function showerror(io::IO, re::RemoteException)
×
63
    (re.pid != myid()) && print(io, "On worker ", re.pid, ":\n")
×
64
    showerror(io, re.captured)
×
65
end
66

67
function run_work_thunk(thunk::Function, print_error::Bool)
1,758✔
68
    local result
1,758✔
69
    try
1,758✔
70
        result = thunk()
1,758✔
71
    catch err
72
        ce = CapturedException(err, catch_backtrace())
×
73
        result = RemoteException(ce)
×
74
        print_error && showerror(stderr, ce)
×
75
    end
76
    return result
1,709✔
77
end
78
function run_work_thunk(rv::RemoteValue, thunk)
253✔
79
    put!(rv, run_work_thunk(thunk, false))
253✔
80
    nothing
253✔
81
end
82

83
function schedule_call(rid, thunk)
253✔
84
    return lock(client_refs) do
253✔
85
        rv = RemoteValue(def_rv_channel())
253✔
86
        (PGRP::ProcessGroup).refs[rid] = rv
253✔
87
        push!(rv.clientset, rid.whence)
253✔
88
        errormonitor(@async run_work_thunk(rv, thunk))
506✔
89
        return rv
253✔
90
    end
91
end
92

93

94
function deliver_result(sock::IO, msg, oid, value)
487✔
95
    #print("$(myid()) sending result $oid\n")
96
    if msg === :call_fetch || isa(value, RemoteException)
516✔
97
        val = value
458✔
98
    else
99
        val = :OK
29✔
100
    end
101
    try
487✔
102
        send_msg_now(sock, MsgHeader(oid), ResultMsg(val))
487✔
103
    catch e
104
        # terminate connection in case of serialization error
105
        # otherwise the reading end would hang
106
        @error "Fatal error on process $(myid())" exception=e,catch_backtrace()
×
107
        wid = worker_id_from_socket(sock)
×
108
        close(sock)
×
109
        if myid()==1
×
110
            rmprocs(wid)
×
111
        elseif wid == 1
×
112
            exit(1)
×
113
        else
114
            remote_do(rmprocs, 1, wid)
×
115
        end
116
    end
117
end
118

119
## message event handlers ##
120
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
85✔
121
    errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
170✔
122
end
123

124
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
85✔
125
    Sockets.nagle(r_stream, false)
85✔
126
    Sockets.quickack(r_stream, true)
85✔
127
    wait_connected(r_stream)
85✔
128
    if r_stream != w_stream
85✔
129
        Sockets.nagle(w_stream, false)
×
130
        Sockets.quickack(w_stream, true)
×
131
        wait_connected(w_stream)
×
132
    end
133
    message_handler_loop(r_stream, w_stream, incoming)
85✔
134
end
135

136
"""
137
    process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
138

139
Called by cluster managers using custom transports. It should be called when the custom
140
transport implementation receives the first message from a remote worker. The custom
141
transport must manage a logical connection to the remote worker and provide two
142
`IO` objects, one for incoming messages and the other for messages addressed to the
143
remote worker.
144
If `incoming` is `true`, the remote peer initiated the connection.
145
Whichever of the pair initiates the connection sends the cluster cookie and its
146
Julia version number to perform the authentication handshake.
147

148
See also [`cluster_cookie`](@ref).
149
"""
150
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
×
151
    errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
×
152
end
153

154
function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
85✔
155
    wpid=0          # the worker r_stream is connected to.
85✔
156
    boundary = similar(MSG_BOUNDARY)
85✔
157
    try
85✔
158
        version = process_hdr(r_stream, incoming)
85✔
159
        serializer = ClusterSerializer(r_stream)
85✔
160

161
        # The first message will associate wpid with r_stream
162
        header = deserialize_hdr_raw(r_stream)
85✔
163
        msg = deserialize_msg(serializer)
85✔
164
        handle_msg(msg, header, r_stream, w_stream, version)
85✔
165
        wpid = worker_id_from_socket(r_stream)
85✔
166
        @assert wpid > 0
85✔
167

168
        readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
85✔
169

170
        while true
2,328✔
171
            reset_state(serializer)
2,328✔
172
            header = deserialize_hdr_raw(r_stream)
2,328✔
173
            # println("header: ", header)
174

175
            try
2,243✔
176
                msg = invokelatest(deserialize_msg, serializer)
2,243✔
177
            catch e
178
                # Deserialization error; discard bytes in stream until boundary found
179
                boundary_idx = 1
×
180
                while true
×
181
                    # This may throw an EOF error if the terminal boundary was not written
182
                    # correctly, triggering the higher-scoped catch block below
183
                    byte = read(r_stream, UInt8)
×
184
                    if byte == MSG_BOUNDARY[boundary_idx]
×
185
                        boundary_idx += 1
×
186
                        if boundary_idx > length(MSG_BOUNDARY)
×
187
                            break
×
188
                        end
189
                    else
190
                        boundary_idx = 1
×
191
                    end
192
                end
×
193

194
                # remotecalls only rethrow RemoteExceptions. Any other exception is treated as
195
                # data to be returned. Wrap this exception in a RemoteException.
196
                remote_err = RemoteException(myid(), CapturedException(e, catch_backtrace()))
×
197
                # println("Deserialization error. ", remote_err)
198
                if !null_id(header.response_oid)
×
199
                    ref = lookup_ref(header.response_oid)
×
200
                    put!(ref, remote_err)
×
201
                end
202
                if !null_id(header.notify_oid)
×
203
                    deliver_result(w_stream, :call_fetch, header.notify_oid, remote_err)
×
204
                end
205
                continue
×
206
            end
207
            readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
2,243✔
208

209
            # println("got msg: ", typeof(msg))
210
            handle_msg(msg, header, r_stream, w_stream, version)
2,243✔
211
        end
2,271✔
212
    catch e
213
        # Check again as it may have been set in a message handler but not propagated to the calling block above
214
        if wpid < 1
28✔
215
            wpid = worker_id_from_socket(r_stream)
×
216
        end
217

218
        if wpid < 1
28✔
219
            println(stderr, e, CapturedException(e, catch_backtrace()))
×
220
            println(stderr, "Process($(myid())) - Unknown remote, closing connection.")
×
221
        elseif !(wpid in map_del_wrkr)
28✔
222
            werr = worker_from_id(wpid)
28✔
223
            oldstate = werr.state
28✔
224
            set_worker_state(werr, W_TERMINATED)
56✔
225

226
            # If unhandleable error occurred talking to pid 1, exit
227
            if wpid == 1
28✔
228
                if isopen(w_stream)
×
229
                    @error "Fatal error on process $(myid())" exception=e,catch_backtrace()
×
230
                end
231
                exit(1)
×
232
            end
233

234
            # Will treat any exception as death of node and cleanup
235
            # since currently we do not have a mechanism for workers to reconnect
236
            # to each other on unhandled errors
237
            deregister_worker(wpid)
28✔
238
        end
239

240
        close(r_stream)
28✔
241
        close(w_stream)
28✔
242

243
        if (myid() == 1) && (wpid > 1)
28✔
244
            if oldstate != W_TERMINATING
28✔
245
                println(stderr, "Worker $wpid terminated.")
×
246
                rethrow()
×
247
            end
248
        end
249

250
        return nothing
28✔
251
    end
252
end
253

254
function process_hdr(s, validate_cookie)
85✔
255
    if validate_cookie
85✔
256
        cookie = read(s, HDR_COOKIE_LEN)
84✔
257
        if length(cookie) < HDR_COOKIE_LEN
42✔
258
            error("Cookie read failed. Connection closed by peer.")
×
259
        end
260

261
        self_cookie = cluster_cookie()
42✔
262
        for i in 1:HDR_COOKIE_LEN
42✔
263
            if UInt8(self_cookie[i]) != cookie[i]
672✔
264
                error("Process($(myid())) - Invalid connection credentials sent by remote.")
×
265
            end
266
        end
1,302✔
267
    end
268

269
    # When we have incompatible julia versions trying to connect to each other,
270
    # and can be detected, raise an appropriate error.
271
    # For now, just return the version.
272
    version = read(s, HDR_VERSION_LEN)
170✔
273
    if length(version) < HDR_VERSION_LEN
85✔
274
        error("Version read failed. Connection closed by peer.")
×
275
    end
276

277
    return VersionNumber(strip(String(version)))
85✔
278
end
279

280
function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
223✔
281
    schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
446✔
282
end
283
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
465✔
284
    errormonitor(@async begin
930✔
285
        v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
930✔
286
        if isa(v, SyncTake)
458✔
287
            try
×
288
                deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
×
289
            finally
290
                unlock(v.rv.synctake)
×
291
            end
292
        else
293
            deliver_result(w_stream, :call_fetch, header.notify_oid, v)
458✔
294
        end
295
        nothing
458✔
296
    end)
297
end
298

299
function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
29✔
300
    errormonitor(@async begin
58✔
301
        rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
58✔
302
        deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
29✔
303
        nothing
29✔
304
    end)
305
end
306

307
function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
1,039✔
308
    errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
3,117✔
309
end
310

311
function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
487✔
312
    put!(lookup_ref(header.response_oid), msg.value)
487✔
313
end
314

315
function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
×
316
    # register a new peer worker connection
317
    w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
×
318
    send_connection_hdr(w, false)
×
319
    send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
×
320
    notify(w.initialized)
×
321
end
322

323
function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version)
×
324
    w = map_sock_wrkr[r_stream]
×
325
    w.version = version
×
326
end
327

328
function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
42✔
329
    LPROC.id = msg.self_pid
42✔
330
    controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
42✔
331
    notify(controller.initialized)
42✔
332
    register_worker(LPROC)
42✔
333
    topology(msg.topology)
42✔
334

335
    if !msg.enable_threaded_blas
42✔
336
        Base.disable_library_threading()
42✔
337
    end
338

339
    lazy = msg.lazy
42✔
340
    PGRP.lazy = lazy
42✔
341

342
    @sync for (connect_at, rpid) in msg.other_workers
81✔
343
        wconfig = WorkerConfig()
9,580✔
344
        wconfig.connect_at = connect_at
479✔
345

346
        let rpid=rpid, wconfig=wconfig
479✔
347
            if lazy
479✔
348
                # The constructor registers the object with a global registry.
349
                Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
479✔
350
            else
351
                @async connect_to_peer(cluster_manager, rpid, wconfig)
×
352
            end
353
        end
354
    end
×
355

356
    send_connection_hdr(controller, false)
42✔
357
    send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
42✔
358
end
359

360
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
×
361
    try
×
362
        (r_s, w_s) = connect(manager, rpid, wconfig)
×
363
        w = Worker(rpid, r_s, w_s, manager; config=wconfig)
×
364
        process_messages(w.r_stream, w.w_stream, false)
×
365
        send_connection_hdr(w, true)
×
366
        send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
×
367
        notify(w.initialized)
×
368
    catch e
369
        @error "Error on $(myid()) while connecting to peer $rpid, exiting" exception=e,catch_backtrace()
×
370
        exit(1)
×
371
    end
372
end
373

374
function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version)
43✔
375
    w = map_sock_wrkr[r_stream]
43✔
376
    environ = something(w.config.environ, Dict())
43✔
377
    environ[:cpu_threads] = msg.cpu_threads
43✔
378
    w.config.environ = environ
43✔
379
    w.config.ospid = msg.ospid
43✔
380
    w.version = version
43✔
381

382
    ntfy_channel = lookup_ref(header.notify_oid)
43✔
383
    put!(ntfy_channel, w.id)
43✔
384

385
    push!(default_worker_pool(), w.id)
43✔
386
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc