• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37451

pending completion
#37451

push

local

web-flow
Document stability for rev=true in sort! (#48759)

72199 of 82566 relevant lines covered (87.44%)

35033312.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.91
/stdlib/Distributed/src/messages.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
abstract type AbstractMsg end
4

5

6
## Wire format description
7
#
8
# Each message has three parts, which are written in order to the worker's stream.
9
#  1) A header of type MsgHeader is serialized to the stream (via `serialize`).
10
#  2) A message of type AbstractMsg is then serialized.
11
#  3) Finally, a fixed boundary of 10 bytes is written.
12

13
# Message header stored separately from body to be able to send back errors if
14
# a deserialization error occurs when reading the message body.
15
struct MsgHeader
16
    response_oid::RRID
17
    notify_oid::RRID
18
    MsgHeader(respond_oid=RRID(0,0), notify_oid=RRID(0,0)) =
76,683✔
19
        new(respond_oid, notify_oid)
20
end
21

22
# Special oid (0,0) uses to indicate a null ID.
23
# Used instead of Union{Int, Nothing} to decrease wire size of header.
24
null_id(id) =  id == RRID(0, 0)
18✔
25

26
struct CallMsg{Mode} <: AbstractMsg
27
    f::Any
29,698✔
28
    args::Tuple
29
    kwargs
30
end
31
struct CallWaitMsg <: AbstractMsg
32
    f::Any
556✔
33
    args::Tuple
34
    kwargs
35
end
36
struct RemoteDoMsg <: AbstractMsg
37
    f::Any
2,812✔
38
    args::Tuple
39
    kwargs
40
end
41
struct ResultMsg <: AbstractMsg
42
    value::Any
29,348✔
43
end
44

45

46
# Worker initialization messages
47
struct IdentifySocketMsg <: AbstractMsg
48
    from_pid::Int
128✔
49
end
50

51
struct IdentifySocketAckMsg <: AbstractMsg
52
end
128✔
53

54
struct JoinPGRPMsg <: AbstractMsg
55
    self_pid::Int
215✔
56
    other_workers::Array
57
    topology::Symbol
58
    enable_threaded_blas::Bool
59
    lazy::Bool
60
end
61
struct JoinCompleteMsg <: AbstractMsg
62
    cpu_threads::Int
215✔
63
    ospid::Int
64
end
65

66
# Avoiding serializing AbstractMsg containers results in a speedup
67
# of approximately 10%. Can be removed once module Serialization
68
# has been suitably improved.
69

70
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
71
                     JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
72
                     CallMsg{:call}, CallMsg{:call_fetch}]
73

74
for (idx, tname) in enumerate(msgtypes)
75
    exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
76
    @eval function serialize_msg(s::AbstractSerializer, o::$tname)
31,527✔
77
        write(s.io, UInt8($idx))
31,527✔
78
        $(exprs...)
37,156✔
79
        return nothing
31,527✔
80
    end
81
end
82

83
let msg_cases = :(@assert false "Message type index ($idx) expected to be between 1:$($(length(msgtypes)))")
84
    for i = length(msgtypes):-1:1
85
        mti = msgtypes[i]
86
        msg_cases = :(if idx == $i
87
                          $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), fieldcount(mti))...))
31,546✔
88
                      else
89
                          $msg_cases
214,320✔
90
                      end)
91
    end
92
    @eval function deserialize_msg(s::AbstractSerializer)
31,546✔
93
        idx = read(s.io, UInt8)
31,546✔
94
        return $msg_cases
31,546✔
95
    end
96
end
97

98
function send_msg_unknown(s::IO, header, msg)
×
99
    error("attempt to send to unknown socket")
×
100
end
101

102
function send_msg(s::IO, header, msg)
×
103
    id = worker_id_from_socket(s)
×
104
    if id > -1
×
105
        return send_msg(worker_from_id(id), header, msg)
×
106
    end
107
    send_msg_unknown(s, header, msg)
×
108
end
109

110
function send_msg_now(s::IO, header, msg::AbstractMsg)
14,674✔
111
    id = worker_id_from_socket(s)
14,674✔
112
    if id > -1
14,674✔
113
        return send_msg_now(worker_from_id(id), header, msg)
14,674✔
114
    end
115
    send_msg_unknown(s, header, msg)
×
116
end
117
function send_msg_now(w::Worker, header, msg)
15,017✔
118
    send_msg_(w, header, msg, true)
15,017✔
119
end
120

121
function send_msg(w::Worker, header, msg)
16,543✔
122
    send_msg_(w, header, msg, false)
16,543✔
123
end
124

125
function flush_gc_msgs(w::Worker)
392✔
126
    if !isdefined(w, :w_stream)
392✔
127
        return
×
128
    end
129
    add_msgs = nothing
×
130
    del_msgs = nothing
×
131
    @lock w.msg_lock begin
784✔
132
        if !w.gcflag # No work needed for this worker
392✔
133
            return
×
134
        end
135
        @atomic w.gcflag = false
392✔
136
        if !isempty(w.add_msgs)
392✔
137
            add_msgs = w.add_msgs
4✔
138
            w.add_msgs = Any[]
4✔
139
        end
140

141
        if !isempty(w.del_msgs)
392✔
142
            del_msgs = w.del_msgs
388✔
143
            w.del_msgs = Any[]
388✔
144
        end
145
    end
146
    if add_msgs !== nothing
392✔
147
        remote_do(add_clients, w, add_msgs)
4✔
148
    end
149
    if del_msgs !== nothing
392✔
150
        remote_do(del_clients, w, del_msgs)
388✔
151
    end
152
    return
391✔
153
end
154

155
# Boundary inserted between messages on the wire, used for recovering
156
# from deserialization errors. Picked arbitrarily.
157
# A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision
158
# with message contents is negligible.
159
const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d]
160

161
# Faster serialization/deserialization of MsgHeader and RRID
162
function serialize_hdr_raw(io, hdr)
31,527✔
163
    write(io, hdr.response_oid.whence, hdr.response_oid.id, hdr.notify_oid.whence, hdr.notify_oid.id)
31,527✔
164
end
165

166
function deserialize_hdr_raw(io)
31,889✔
167
    data = read!(io, Ref{NTuple{4,Int}}())[]
31,889✔
168
    return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
31,546✔
169
end
170

171
function send_msg_(w::Worker, header, msg, now::Bool)
31,556✔
172
    check_worker_state(w)
31,556✔
173
    if myid() != 1 && !isa(msg, IdentifySocketMsg) && !isa(msg, IdentifySocketAckMsg)
31,523✔
174
        wait(w.initialized)
15,306✔
175
    end
176
    io = w.w_stream
31,523✔
177
    lock(io)
31,523✔
178
    try
31,523✔
179
        reset_state(w.w_serializer)
31,523✔
180
        serialize_hdr_raw(io, header)
31,523✔
181
        invokelatest(serialize_msg, w.w_serializer, msg)  # io is wrapped in w_serializer
31,523✔
182
        write(io, MSG_BOUNDARY)
31,523✔
183

184
        if !now && w.gcflag
31,523✔
185
            flush_gc_msgs(w)
41✔
186
        else
187
            flush(io)
63,004✔
188
        end
189
    finally
190
        unlock(io)
31,522✔
191
    end
192
end
193

194
function flush_gc_msgs()
272✔
195
    try
272✔
196
        for w in (PGRP::ProcessGroup).workers
272✔
197
            if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
3,347✔
198
                flush_gc_msgs(w)
351✔
199
            end
200
        end
3,346✔
201
    catch e
202
        bt = catch_backtrace()
×
203
        @async showerror(stderr, e, bt)
×
204
    end
205
end
206

207
function send_connection_hdr(w::Worker, cookie=true)
343✔
208
    # For a connection initiated from the remote side to us, we only send the version,
209
    # else when we initiate a connection we first send the cookie followed by our version.
210
    # The remote side validates the cookie.
211
    if cookie
343✔
212
        write(w.w_stream, LPROC.cookie)
172✔
213
    end
214
    write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN])
343✔
215
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc