• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37584

pending completion
#37584

push

local

web-flow
relax assertion involving pg->nold to reflect that it may be a bit inaccurate with parallel marking (#50466)

70958 of 83746 relevant lines covered (84.73%)

23916169.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.46
/stdlib/Distributed/src/clusterserialize.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
using Serialization: serialize_cycle, deserialize_cycle, writetag,
4
                     serialize_typename, deserialize_typename,
5
                     TYPENAME_TAG, TASK_TAG, reset_state, serialize_type
6
using Serialization.__deserialized_types__
7

8
import Serialization: object_number, lookup_object_number, remember_object
9

10
mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
11
    io::I
12
    counter::Int
13
    table::IdDict{Any,Any}
14
    pending_refs::Vector{Int}
15

16
    pid::Int                                     # Worker we are connected to.
17
    tn_obj_sent::Set{UInt64}                     # TypeName objects sent
18
    glbs_sent::Dict{Symbol, Tuple{UInt64, UInt64}}   # (key,value) -> (symbol, (hash_value, objectid))
19
    glbs_in_tnobj::Dict{UInt64, Vector{Symbol}}  # Track globals referenced in
20
                                                 # anonymous functions.
21
    anonfunc_id::UInt64
22

23
    function ClusterSerializer{I}(io::I) where I<:IO
170✔
24
        new(io, 0, IdDict(), Int[], worker_id_from_socket(io),
170✔
25
            Set{UInt64}(), Dict{UInt64, UInt64}(), Dict{UInt64, Vector{Symbol}}(), 0)
26
    end
27
end
28
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)
170✔
29

30
const object_numbers = WeakKeyDict()
31
const obj_number_salt = Ref(0)
32
function object_number(s::ClusterSerializer, @nospecialize(l))
266✔
33
    global obj_number_salt, object_numbers
266✔
34
    if haskey(object_numbers, l)
266✔
35
        return object_numbers[l]
178✔
36
    end
37
    # a hash function that always gives the same number to the same
38
    # object on the same machine, and is unique over all machines.
39
    ln = obj_number_salt[]+(UInt64(myid())<<44)
88✔
40
    obj_number_salt[] += 1
88✔
41
    object_numbers[l] = ln
88✔
42
    return ln::UInt64
88✔
43
end
44

45
const known_object_data = Dict{UInt64,Any}()
46

47
function lookup_object_number(s::ClusterSerializer, n::UInt64)
275✔
48
    return get(known_object_data, n, nothing)
460✔
49
end
50

51
function remember_object(s::ClusterSerializer, @nospecialize(o), n::UInt64)
275✔
52
    known_object_data[n] = o
275✔
53
    if isa(o, Core.TypeName) && !haskey(object_numbers, o)
275✔
54
        # set up reverse mapping for serialize
55
        object_numbers[o] = n
45✔
56
    end
57
    return nothing
275✔
58
end
59

60
function deserialize(s::ClusterSerializer, ::Type{Core.TypeName})
230✔
61
    full_body_sent = deserialize(s)
230✔
62
    number = read(s.io, UInt64)
230✔
63
    if !full_body_sent
230✔
64
        tn = lookup_object_number(s, number)::Core.TypeName
370✔
65
        remember_object(s, tn, number)
185✔
66
        deserialize_cycle(s, tn)
185✔
67
    else
68
        tn = deserialize_typename(s, number)
45✔
69
    end
70

71
    # retrieve arrays of global syms sent if any and deserialize them all.
72
    foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
230✔
73
    return tn
230✔
74
end
75

76
function serialize(s::ClusterSerializer, t::Core.TypeName)
266✔
77
    serialize_cycle(s, t) && return
266✔
78
    writetag(s.io, TYPENAME_TAG)
222✔
79

80
    identifier = object_number(s, t)
266✔
81
    send_whole = !(identifier in s.tn_obj_sent)
222✔
82
    serialize(s, send_whole)
400✔
83
    write(s.io, identifier)
222✔
84
    if send_whole
222✔
85
        # Track globals referenced in this anonymous function.
86
        # This information is used to resend modified globals when we
87
        # only send the identifier.
88
        prev = s.anonfunc_id
44✔
89
        s.anonfunc_id = identifier
44✔
90
        serialize_typename(s, t)
44✔
91
        s.anonfunc_id = prev
44✔
92
        push!(s.tn_obj_sent, identifier)
44✔
93
        finalizer(t) do x
44✔
94
            cleanup_tname_glbs(s, identifier)
95
        end
96
    end
97

98
    # Send global refs if required.
99
    syms = syms_2b_sent(s, identifier)
222✔
100
    serialize(s, syms)
222✔
101
    foreach(sym->serialize_global_from_main(s, sym), syms)
222✔
102
    nothing
222✔
103
end
104

105
function serialize(s::ClusterSerializer, g::GlobalRef)
132✔
106
    # Record if required and then invoke the default GlobalRef serializer.
107
    sym = g.name
132✔
108
    if g.mod === Main && isdefined(g.mod, sym)
132✔
109
        if (binding_module(Main, sym) === Main) && (s.anonfunc_id != 0) &&
21✔
110
            !startswith(string(sym), "#") # Anonymous functions are handled via FULL_GLOBALREF_TAG
111

112
            push!(get!(s.glbs_in_tnobj, s.anonfunc_id, []), sym)
×
113
        end
114
    end
115

116
    invoke(serialize, Tuple{AbstractSerializer, GlobalRef}, s, g)
132✔
117
end
118

119
# Send/resend a global binding if
120
# a) has not been sent previously, i.e., we are seeing this binding for the first time, or,
121
# b) hash value has changed or
122
# c) hash value is same but of a different object, i.e. objectid has changed or
123
# d) is a bits type
124
function syms_2b_sent(s::ClusterSerializer, identifier)
222✔
125
    lst = Symbol[]
222✔
126
    check_syms = get(s.glbs_in_tnobj, identifier, Symbol[])
222✔
127
    for sym in check_syms
444✔
128
        v = getfield(Main, sym)
×
129

130
        if isbits(v)
×
131
            push!(lst, sym)
×
132
        else
133
            if haskey(s.glbs_sent, sym)
×
134
                # We have sent this binding before, see if it has changed.
135
                hval, oid = s.glbs_sent[sym]
×
136
                if hval != hash(sym, hash(v)) || oid != objectid(v)
×
137
                    push!(lst, sym)
×
138
                end
139
            else
140
                push!(lst, sym)
×
141
            end
142
        end
143
    end
×
144
    return unique(lst)
222✔
145
end
146

147
function serialize_global_from_main(s::ClusterSerializer, sym)
×
148
    v = getfield(Main, sym)
×
149

150
    if !isbits(v)
×
151
        s.glbs_sent[sym] = (hash(sym, hash(v)), objectid(v))
×
152
    end
153

154
    serialize(s, isconst(Main, sym))
×
155
    serialize(s, v)
×
156
end
157

158
function deserialize_global_from_main(s::ClusterSerializer, sym)
×
159
    sym_isconst = deserialize(s)
×
160
    v = deserialize(s)
×
161
    if isdefined(Main, sym) && (sym_isconst || isconst(Main, sym))
×
162
        if isequal(getfield(Main, sym), v)
×
163
            # same value; ok
164
            return nothing
×
165
        else
166
            @warn "Cannot transfer global variable $sym; it already has a value."
×
167
            return nothing
×
168
        end
169
    end
170
    if sym_isconst
×
171
        ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
×
172
    else
173
        setglobal!(Main, sym, v)
×
174
    end
175
    return nothing
×
176
end
177

178
function cleanup_tname_glbs(s::ClusterSerializer, identifier)
×
179
    delete!(s.glbs_in_tnobj, identifier)
×
180
end
181

182
# TODO: cleanup from s.tn_obj_sent
183

184

185
# Specialized serialize-deserialize implementations for CapturedException to partially
186
# recover from any deserialization errors in `CapturedException.ex`
187

188
function serialize(s::ClusterSerializer, ex::CapturedException)
×
189
    serialize_type(s, typeof(ex))
×
190
    serialize(s, string(typeof(ex.ex))) # String type should not result in a deser error
×
191
    serialize(s, ex.processed_bt)       # Currently should not result in a deser error
×
192
    serialize(s, ex.ex)                 # can result in a UndefVarError on the remote node
×
193
                                        # if a type used in ex.ex is undefined on the remote node.
194
end
195

196
function original_ex(s::ClusterSerializer, ex_str, remote_stktrace)
×
197
    local pid_str = ""
×
198
    try
×
199
        pid_str = string(" from worker ", worker_id_from_socket(s.io))
×
200
    catch
×
201
    end
202

203
    stk_str = remote_stktrace ? "Remote" : "Local"
×
204
    ErrorException(string("Error deserializing a remote exception", pid_str, "\n",
×
205
                          "Remote(original) exception of type ", ex_str, "\n",
206
                          stk_str,  " stacktrace : "))
207
end
208

209
function deserialize(s::ClusterSerializer, t::Type{<:CapturedException})
×
210
    ex_str = deserialize(s)
×
211
    local bt
×
212
    local capex
×
213
    try
×
214
        bt = deserialize(s)
×
215
    catch e
216
        throw(CompositeException([
×
217
            original_ex(s, ex_str, false),
218
            CapturedException(e, catch_backtrace())
219
        ]))
220
    end
221

222
    try
×
223
        capex = deserialize(s)
×
224
    catch e
225
        throw(CompositeException([
×
226
            CapturedException(original_ex(s, ex_str, true), bt),
227
            CapturedException(e, catch_backtrace())
228
        ]))
229
    end
230

231
    return CapturedException(capex, bt)
×
232
end
233

234
"""
235
    clear!(syms, pids=workers(); mod=Main)
236

237
Clears global bindings in modules by initializing them to `nothing`.
238
`syms` should be of type [`Symbol`](@ref) or a collection of `Symbol`s . `pids` and `mod`
239
identify the processes and the module in which global variables are to be
240
reinitialized. Only those names found to be defined under `mod` are cleared.
241

242
An exception is raised if a global constant is requested to be cleared.
243
"""
244
function clear!(syms, pids=workers(); mod=Main)
×
245
    @sync for p in pids
×
246
        @async_unwrap remotecall_wait(clear_impl!, p, syms, mod)
×
247
    end
×
248
end
249
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
×
250
clear!(sym::Symbol, pids=workers(); mod=Main) = clear!([sym], pids; mod=mod)
×
251
clear!(syms, pid::Int; mod=Main) = clear!(syms, [pid]; mod=mod)
×
252

253
clear_impl!(syms, mod::Module) = foreach(x->clear_impl!(x,mod), syms)
×
254
clear_impl!(sym::Symbol, mod::Module) = isdefined(mod, sym) && @eval(mod, global $sym = nothing)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc