• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37584

pending completion
#37584

push

local

web-flow
relax assertion involving pg->nold to reflect that it may be a bit inaccurate with parallel marking (#50466)

70958 of 83746 relevant lines covered (84.73%)

23916169.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.01
/stdlib/Distributed/src/messages.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
abstract type AbstractMsg end
4

5

6
## Wire format description
7
#
8
# Each message has three parts, which are written in order to the worker's stream.
9
#  1) A header of type MsgHeader is serialized to the stream (via `serialize`).
10
#  2) A message of type AbstractMsg is then serialized.
11
#  3) Finally, a fixed boundary of 10 bytes is written.
12

13
# Message header stored separately from body to be able to send back errors if
14
# a deserialization error occurs when reading the message body.
15
struct MsgHeader
16
    response_oid::RRID
17
    notify_oid::RRID
18
    MsgHeader(respond_oid=RRID(0,0), notify_oid=RRID(0,0)) =
4,314✔
19
        new(respond_oid, notify_oid)
20
end
21

22
# Special oid (0,0) uses to indicate a null ID.
23
# Used instead of Union{Int, Nothing} to decrease wire size of header.
24
null_id(id) =  id == RRID(0, 0)
×
25

26
struct CallMsg{Mode} <: AbstractMsg
27
    f::Any
1,368✔
28
    args::Tuple
29
    kwargs
30
end
31
struct CallWaitMsg <: AbstractMsg
32
    f::Any
59✔
33
    args::Tuple
34
    kwargs
35
end
36
struct RemoteDoMsg <: AbstractMsg
37
    f::Any
2,079✔
38
    args::Tuple
39
    kwargs
40
end
41
struct ResultMsg <: AbstractMsg
42
    value::Any
974✔
43
end
44

45

46
# Worker initialization messages
47
struct IdentifySocketMsg <: AbstractMsg
48
    from_pid::Int
×
49
end
50

51
struct IdentifySocketAckMsg <: AbstractMsg
×
52
end
53

54
struct JoinPGRPMsg <: AbstractMsg
55
    self_pid::Int
85✔
56
    other_workers::Array
57
    topology::Symbol
58
    enable_threaded_blas::Bool
59
    lazy::Bool
60
end
61
struct JoinCompleteMsg <: AbstractMsg
62
    cpu_threads::Int
85✔
63
    ospid::Int
64
end
65

66
# Avoiding serializing AbstractMsg containers results in a speedup
67
# of approximately 10%. Can be removed once module Serialization
68
# has been suitably improved.
69

70
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
71
                     JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
72
                     CallMsg{:call}, CallMsg{:call_fetch}]
73

74
for (idx, tname) in enumerate(msgtypes)
75
    exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
76
    @eval function serialize_msg(s::AbstractSerializer, o::$tname)
2,322✔
77
        write(s.io, UInt8($idx))
2,322✔
78
        $(exprs...)
3,823✔
79
        return nothing
2,322✔
80
    end
81
end
82

83
let msg_cases = :(@assert false "Message type index ($idx) expected to be between 1:$($(length(msgtypes)))")
84
    for i = length(msgtypes):-1:1
85
        mti = msgtypes[i]
86
        msg_cases = :(if idx == $i
87
                          $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), fieldcount(mti))...))
2,328✔
88
                      else
89
                          $msg_cases
13,695✔
90
                      end)
91
    end
92
    @eval function deserialize_msg(s::AbstractSerializer)
2,328✔
93
        idx = read(s.io, UInt8)
2,328✔
94
        return $msg_cases
2,328✔
95
    end
96
end
97

98
function send_msg_unknown(s::IO, header, msg)
×
99
    error("attempt to send to unknown socket")
×
100
end
101

102
function send_msg(s::IO, header, msg)
×
103
    id = worker_id_from_socket(s)
×
104
    if id > -1
×
105
        return send_msg(worker_from_id(id), header, msg)
×
106
    end
107
    send_msg_unknown(s, header, msg)
×
108
end
109

110
function send_msg_now(s::IO, header, msg::AbstractMsg)
487✔
111
    id = worker_id_from_socket(s)
487✔
112
    if id > -1
487✔
113
        return send_msg_now(worker_from_id(id), header, msg)
487✔
114
    end
115
    send_msg_unknown(s, header, msg)
×
116
end
117
function send_msg_now(w::Worker, header, msg)
572✔
118
    send_msg_(w, header, msg, true)
572✔
119
end
120

121
function send_msg(w::Worker, header, msg)
1,750✔
122
    send_msg_(w, header, msg, false)
1,750✔
123
end
124

125
function flush_gc_msgs(w::Worker)
195✔
126
    if !isdefined(w, :w_stream)
195✔
127
        return
×
128
    end
129
    add_msgs = nothing
×
130
    del_msgs = nothing
×
131
    @lock w.msg_lock begin
390✔
132
        if !w.gcflag # No work needed for this worker
195✔
133
            return
×
134
        end
135
        @atomic w.gcflag = false
195✔
136
        if !isempty(w.add_msgs)
195✔
137
            add_msgs = w.add_msgs
×
138
            w.add_msgs = Any[]
×
139
        end
140

141
        if !isempty(w.del_msgs)
195✔
142
            del_msgs = w.del_msgs
195✔
143
            w.del_msgs = Any[]
195✔
144
        end
145
    end
146
    if add_msgs !== nothing
195✔
147
        remote_do(add_clients, w, add_msgs)
×
148
    end
149
    if del_msgs !== nothing
195✔
150
        remote_do(del_clients, w, del_msgs)
195✔
151
    end
152
    return
195✔
153
end
154

155
# Boundary inserted between messages on the wire, used for recovering
156
# from deserialization errors. Picked arbitrarily.
157
# A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision
158
# with message contents is negligible.
159
const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d]
160

161
# Faster serialization/deserialization of MsgHeader and RRID
162
function serialize_hdr_raw(io, hdr)
2,322✔
163
    write(io, hdr.response_oid.whence, hdr.response_oid.id, hdr.notify_oid.whence, hdr.notify_oid.id)
2,322✔
164
end
165

166
function deserialize_hdr_raw(io)
2,413✔
167
    data = read!(io, Ref{NTuple{4,Int}}())[]
2,413✔
168
    return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
2,328✔
169
end
170

171
function send_msg_(w::Worker, header, msg, now::Bool)
2,322✔
172
    check_worker_state(w)
2,322✔
173
    if myid() != 1 && !isa(msg, IdentifySocketMsg) && !isa(msg, IdentifySocketAckMsg)
2,322✔
174
        wait(w.initialized)
914✔
175
    end
176
    io = w.w_stream
2,322✔
177
    lock(io)
2,322✔
178
    try
2,322✔
179
        reset_state(w.w_serializer)
2,322✔
180
        serialize_hdr_raw(io, header)
2,322✔
181
        invokelatest(serialize_msg, w.w_serializer, msg)  # io is wrapped in w_serializer
2,322✔
182
        write(io, MSG_BOUNDARY)
2,322✔
183

184
        if !now && w.gcflag
2,322✔
185
            flush_gc_msgs(w)
17✔
186
        else
187
            flush(io)
4,627✔
188
        end
189
    finally
190
        unlock(io)
2,322✔
191
    end
192
end
193

194
function flush_gc_msgs()
180✔
195
    try
180✔
196
        for w in (PGRP::ProcessGroup).workers
180✔
197
            if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
2,936✔
198
                flush_gc_msgs(w)
178✔
199
            end
200
        end
2,936✔
201
    catch e
202
        bt = catch_backtrace()
×
203
        @async showerror(stderr, e, bt)
×
204
    end
205
end
206

207
function send_connection_hdr(w::Worker, cookie=true)
85✔
208
    # For a connection initiated from the remote side to us, we only send the version,
209
    # else when we initiate a connection we first send the cookie followed by our version.
210
    # The remote side validates the cookie.
211
    if cookie
85✔
212
        write(w.w_stream, LPROC.cookie)
43✔
213
    end
214
    write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN])
85✔
215
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc