• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37430

pending completion
#37430

push

local

web-flow
Use `sum` in `count` for constant folding of type based predicates (#48454)

* Use `sum` in `count` for constant folding for type based predicates.

* Use existing `_bool` functionality for type assertion

---------

Co-authored-by: Sukera <Seelengrab@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

69787 of 75048 relevant lines covered (92.99%)

34159588.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.02
/stdlib/Distributed/src/messages.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
abstract type AbstractMsg end
4

5

6
## Wire format description
7
#
8
# Each message has three parts, which are written in order to the worker's stream.
9
#  1) A header of type MsgHeader is serialized to the stream (via `serialize`).
10
#  2) A message of type AbstractMsg is then serialized.
11
#  3) Finally, a fixed boundary of 10 bytes is written.
12

13
# Message header stored separately from body to be able to send back errors if
14
# a deserialization error occurs when reading the message body.
15
struct MsgHeader
16
    response_oid::RRID
17
    notify_oid::RRID
18
    MsgHeader(respond_oid=RRID(0,0), notify_oid=RRID(0,0)) =
76,360✔
19
        new(respond_oid, notify_oid)
20
end
21

22
# Special oid (0,0) uses to indicate a null ID.
23
# Used instead of Union{Int, Nothing} to decrease wire size of header.
24
null_id(id) =  id == RRID(0, 0)
18✔
25

26
struct CallMsg{Mode} <: AbstractMsg
27
    f::Any
29,673✔
28
    args::Tuple
29
    kwargs
30
end
31
struct CallWaitMsg <: AbstractMsg
32
    f::Any
540✔
33
    args::Tuple
34
    kwargs
35
end
36
struct RemoteDoMsg <: AbstractMsg
37
    f::Any
2,407✔
38
    args::Tuple
39
    kwargs
40
end
41
struct ResultMsg <: AbstractMsg
42
    value::Any
29,312✔
43
end
44

45

46
# Worker initialization messages
47
struct IdentifySocketMsg <: AbstractMsg
48
    from_pid::Int
128✔
49
end
50

51
struct IdentifySocketAckMsg <: AbstractMsg
52
end
128✔
53

54
struct JoinPGRPMsg <: AbstractMsg
55
    self_pid::Int
203✔
56
    other_workers::Array
57
    topology::Symbol
58
    enable_threaded_blas::Bool
59
    lazy::Bool
60
end
61
struct JoinCompleteMsg <: AbstractMsg
62
    cpu_threads::Int
203✔
63
    ospid::Int
64
end
65

66
# Avoiding serializing AbstractMsg containers results in a speedup
67
# of approximately 10%. Can be removed once module Serialization
68
# has been suitably improved.
69

70
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
71
                     JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
72
                     CallMsg{:call}, CallMsg{:call_fetch}]
73

74
for (idx, tname) in enumerate(msgtypes)
75
    exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
76
    @eval function serialize_msg(s::AbstractSerializer, o::$tname)
31,274✔
77
        write(s.io, UInt8($idx))
31,274✔
78
        $(exprs...)
36,689✔
79
        return nothing
31,274✔
80
    end
81
end
82

83
let msg_cases = :(@assert false "Message type index ($idx) expected to be between 1:$($(length(msgtypes)))")
84
    for i = length(msgtypes):-1:1
85
        mti = msgtypes[i]
86
        msg_cases = :(if idx == $i
87
                          $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), fieldcount(mti))...))
31,293✔
88
                      else
89
                          $msg_cases
213,059✔
90
                      end)
91
    end
92
    @eval function deserialize_msg(s::AbstractSerializer)
31,293✔
93
        idx = read(s.io, UInt8)
31,293✔
94
        return $msg_cases
31,293✔
95
    end
96
end
97

98
function send_msg_unknown(s::IO, header, msg)
×
99
    error("attempt to send to unknown socket")
×
100
end
101

102
function send_msg(s::IO, header, msg)
103
    id = worker_id_from_socket(s)
104
    if id > -1
105
        return send_msg(worker_from_id(id), header, msg)
106
    end
107
    send_msg_unknown(s, header, msg)
108
end
109

110
function send_msg_now(s::IO, header, msg::AbstractMsg)
14,656✔
111
    id = worker_id_from_socket(s)
14,656✔
112
    if id > -1
14,656✔
113
        return send_msg_now(worker_from_id(id), header, msg)
14,656✔
114
    end
115
    send_msg_unknown(s, header, msg)
×
116
end
117
function send_msg_now(w::Worker, header, msg)
14,987✔
118
    send_msg_(w, header, msg, true)
14,987✔
119
end
120

121
function send_msg(w::Worker, header, msg)
16,320✔
122
    send_msg_(w, header, msg, false)
16,320✔
123
end
124

125
function flush_gc_msgs(w::Worker)
383✔
126
    if !isdefined(w, :w_stream)
383✔
127
        return
×
128
    end
129
    add_msgs = nothing
×
130
    del_msgs = nothing
×
131
    @lock w.msg_lock begin
766✔
132
        if !w.gcflag # No work needed for this worker
383✔
133
            return
×
134
        end
135
        @atomic w.gcflag = false
383✔
136
        if !isempty(w.add_msgs)
383✔
137
            add_msgs = w.add_msgs
4✔
138
            w.add_msgs = Any[]
4✔
139
        end
140

141
        if !isempty(w.del_msgs)
383✔
142
            del_msgs = w.del_msgs
379✔
143
            w.del_msgs = Any[]
379✔
144
        end
145
    end
146
    if add_msgs !== nothing
383✔
147
        remote_do(add_clients, w, add_msgs)
4✔
148
    end
149
    if del_msgs !== nothing
383✔
150
        remote_do(del_clients, w, del_msgs)
379✔
151
    end
152
    return
382✔
153
end
154

155
# Boundary inserted between messages on the wire, used for recovering
156
# from deserialization errors. Picked arbitrarily.
157
# A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision
158
# with message contents is negligible.
159
const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d]
160

161
# Faster serialization/deserialization of MsgHeader and RRID
162
function serialize_hdr_raw(io, hdr)
31,274✔
163
    write(io, hdr.response_oid.whence, hdr.response_oid.id, hdr.notify_oid.whence, hdr.notify_oid.id)
31,274✔
164
end
165

166
function deserialize_hdr_raw(io)
31,624✔
167
    data = read!(io, Ref{NTuple{4,Int}}())[]
31,624✔
168
    return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
31,293✔
169
end
170

171
function send_msg_(w::Worker, header, msg, now::Bool)
31,303✔
172
    check_worker_state(w)
31,303✔
173
    if myid() != 1 && !isa(msg, IdentifySocketMsg) && !isa(msg, IdentifySocketAckMsg)
31,270✔
174
        wait(w.initialized)
15,310✔
175
    end
176
    io = w.w_stream
31,270✔
177
    lock(io)
31,270✔
178
    try
31,270✔
179
        reset_state(w.w_serializer)
31,270✔
180
        serialize_hdr_raw(io, header)
31,270✔
181
        invokelatest(serialize_msg, w.w_serializer, msg)  # io is wrapped in w_serializer
31,270✔
182
        write(io, MSG_BOUNDARY)
31,270✔
183

184
        if !now && w.gcflag
31,270✔
185
            flush_gc_msgs(w)
45✔
186
        else
187
            flush(io)
62,494✔
188
        end
189
    finally
190
        unlock(io)
31,269✔
191
    end
192
end
193

194
function flush_gc_msgs()
268✔
195
    try
268✔
196
        for w in (PGRP::ProcessGroup).workers
268✔
197
            if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
3,345✔
198
                flush_gc_msgs(w)
338✔
199
            end
200
        end
3,344✔
201
    catch e
202
        bt = catch_backtrace()
×
203
        @async showerror(stderr, e, bt)
×
204
    end
205
end
206

207
function send_connection_hdr(w::Worker, cookie=true)
331✔
208
    # For a connection initiated from the remote side to us, we only send the version,
209
    # else when we initiate a connection we first send the cookie followed by our version.
210
    # The remote side validates the cookie.
211
    if cookie
331✔
212
        write(w.w_stream, LPROC.cookie)
166✔
213
    end
214
    write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN])
331✔
215
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc