• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37433

pending completion
#37433

push

local

web-flow
Merge pull request #48513 from JuliaLang/jn/extend-once

ensure extension triggers are only run by the package that satified them

60 of 60 new or added lines in 1 file covered. (100.0%)

72324 of 82360 relevant lines covered (87.81%)

31376331.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.16
/stdlib/Distributed/src/clusterserialize.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
using Serialization: serialize_cycle, deserialize_cycle, writetag,
4
                     serialize_typename, deserialize_typename,
5
                     TYPENAME_TAG, TASK_TAG, reset_state, serialize_type
6
using Serialization.__deserialized_types__
7

8
import Serialization: object_number, lookup_object_number, remember_object
9

10
mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
11
    io::I
12
    counter::Int
13
    table::IdDict{Any,Any}
14
    pending_refs::Vector{Int}
15

16
    pid::Int                                     # Worker we are connected to.
17
    tn_obj_sent::Set{UInt64}                     # TypeName objects sent
18
    glbs_sent::Dict{Symbol, Tuple{UInt64, UInt64}}   # (key,value) -> (symbol, (hash_value, objectid))
19
    glbs_in_tnobj::Dict{UInt64, Vector{Symbol}}  # Track globals referenced in
20
                                                 # anonymous functions.
21
    anonfunc_id::UInt64
22

23
    function ClusterSerializer{I}(io::I) where I<:IO
666✔
24
        new(io, 0, IdDict(), Int[], worker_id_from_socket(io),
666✔
25
            Set{UInt64}(), Dict{UInt64, UInt64}(), Dict{UInt64, Vector{Symbol}}(), 0)
26
    end
27
end
28
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)
666✔
29

30
const object_numbers = WeakKeyDict()
31
const obj_number_salt = Ref(0)
32
function object_number(s::ClusterSerializer, @nospecialize(l))
3,776✔
33
    global obj_number_salt, object_numbers
3,776✔
34
    if haskey(object_numbers, l)
3,776✔
35
        return object_numbers[l]
3,312✔
36
    end
37
    # a hash function that always gives the same number to the same
38
    # object on the same machine, and is unique over all machines.
39
    ln = obj_number_salt[]+(UInt64(myid())<<44)
464✔
40
    obj_number_salt[] += 1
464✔
41
    object_numbers[l] = ln
464✔
42
    return ln::UInt64
464✔
43
end
44

45
const known_object_data = Dict{UInt64,Any}()
46

47
function lookup_object_number(s::ClusterSerializer, n::UInt64)
3,794✔
48
    return get(known_object_data, n, nothing)
6,815✔
49
end
50

51
function remember_object(s::ClusterSerializer, @nospecialize(o), n::UInt64)
3,793✔
52
    known_object_data[n] = o
3,793✔
53
    if isa(o, Core.TypeName) && !haskey(object_numbers, o)
3,793✔
54
        # set up reverse mapping for serialize
55
        object_numbers[o] = n
386✔
56
    end
57
    return nothing
3,793✔
58
end
59

60
function deserialize(s::ClusterSerializer, ::Type{Core.TypeName})
3,407✔
61
    full_body_sent = deserialize(s)
3,407✔
62
    number = read(s.io, UInt64)
3,407✔
63
    if !full_body_sent
3,407✔
64
        tn = lookup_object_number(s, number)::Core.TypeName
6,040✔
65
        remember_object(s, tn, number)
3,020✔
66
        deserialize_cycle(s, tn)
3,020✔
67
    else
68
        tn = deserialize_typename(s, number)
387✔
69
    end
70

71
    # retrieve arrays of global syms sent if any and deserialize them all.
72
    foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
3,505✔
73
    return tn
3,403✔
74
end
75

76
function serialize(s::ClusterSerializer, t::Core.TypeName)
5,291✔
77
    serialize_cycle(s, t) && return
5,291✔
78
    writetag(s.io, TYPENAME_TAG)
3,390✔
79

80
    identifier = object_number(s, t)
3,616✔
81
    send_whole = !(identifier in s.tn_obj_sent)
3,390✔
82
    serialize(s, send_whole)
6,394✔
83
    write(s.io, identifier)
3,390✔
84
    if send_whole
3,390✔
85
        # Track globals referenced in this anonymous function.
86
        # This information is used to resend modified globals when we
87
        # only send the identifier.
88
        prev = s.anonfunc_id
386✔
89
        s.anonfunc_id = identifier
386✔
90
        serialize_typename(s, t)
386✔
91
        s.anonfunc_id = prev
386✔
92
        push!(s.tn_obj_sent, identifier)
386✔
93
        finalizer(t) do x
386✔
94
            cleanup_tname_glbs(s, identifier)
95
        end
96
    end
97

98
    # Send global refs if required.
99
    syms = syms_2b_sent(s, identifier)
3,390✔
100
    serialize(s, syms)
3,390✔
101
    foreach(sym->serialize_global_from_main(s, sym), syms)
3,584✔
102
    nothing
3,390✔
103
end
104

105
function serialize(s::ClusterSerializer, g::GlobalRef)
1,747✔
106
    # Record if required and then invoke the default GlobalRef serializer.
107
    sym = g.name
1,747✔
108
    if g.mod === Main && isdefined(g.mod, sym)
1,747✔
109
        if (binding_module(Main, sym) === Main) && (s.anonfunc_id != 0) &&
792✔
110
            !startswith(string(sym), "#") # Anonymous functions are handled via FULL_GLOBALREF_TAG
111

112
            push!(get!(s.glbs_in_tnobj, s.anonfunc_id, []), sym)
105✔
113
        end
114
    end
115

116
    invoke(serialize, Tuple{AbstractSerializer, GlobalRef}, s, g)
1,747✔
117
end
118

119
# Send/resend a global binding if
120
# a) has not been sent previously, i.e., we are seeing this binding for the first time, or,
121
# b) hash value has changed or
122
# c) hash value is same but of a different object, i.e. objectid has changed or
123
# d) is a bits type
124
function syms_2b_sent(s::ClusterSerializer, identifier)
3,390✔
125
    lst = Symbol[]
3,390✔
126
    check_syms = get(s.glbs_in_tnobj, identifier, Symbol[])
3,511✔
127
    for sym in check_syms
6,659✔
128
        v = getfield(Main, sym)
137✔
129

130
        if isbits(v)
137✔
131
            push!(lst, sym)
61✔
132
        else
133
            if haskey(s.glbs_sent, sym)
142✔
134
                # We have sent this binding before, see if it has changed.
135
                hval, oid = s.glbs_sent[sym]
42✔
136
                if hval != hash(sym, hash(v)) || oid != objectid(v)
69✔
137
                    push!(lst, sym)
16✔
138
                end
139
            else
140
                push!(lst, sym)
34✔
141
            end
142
        end
143
    end
258✔
144
    return unique(lst)
3,390✔
145
end
146

147
function serialize_global_from_main(s::ClusterSerializer, sym)
99✔
148
    v = getfield(Main, sym)
99✔
149

150
    if !isbits(v)
99✔
151
        s.glbs_sent[sym] = (hash(sym, hash(v)), objectid(v))
48✔
152
    end
153

154
    serialize(s, isconst(Main, sym))
189✔
155
    serialize(s, v)
99✔
156
end
157

158
function deserialize_global_from_main(s::ClusterSerializer, sym)
99✔
159
    sym_isconst = deserialize(s)
99✔
160
    v = deserialize(s)
99✔
161
    if isdefined(Main, sym) && (sym_isconst || isconst(Main, sym))
125✔
162
        if isequal(getfield(Main, sym), v)
6✔
163
            # same value; ok
164
            return nothing
6✔
165
        else
166
            @warn "Cannot transfer global variable $sym; it already has a value."
×
167
            return nothing
×
168
        end
169
    end
170
    if sym_isconst
90✔
171
        ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
2✔
172
    else
173
        setglobal!(Main, sym, v)
88✔
174
    end
175
    return nothing
90✔
176
end
177

178
function cleanup_tname_glbs(s::ClusterSerializer, identifier)
×
179
    delete!(s.glbs_in_tnobj, identifier)
×
180
end
181

182
# TODO: cleanup from s.tn_obj_sent
183

184

185
# Specialized serialize-deserialize implementations for CapturedException to partially
186
# recover from any deserialization errors in `CapturedException.ex`
187

188
function serialize(s::ClusterSerializer, ex::CapturedException)
84✔
189
    serialize_type(s, typeof(ex))
84✔
190
    serialize(s, string(typeof(ex.ex))) # String type should not result in a deser error
84✔
191
    serialize(s, ex.processed_bt)       # Currently should not result in a deser error
84✔
192
    serialize(s, ex.ex)                 # can result in a UndefVarError on the remote node
84✔
193
                                        # if a type used in ex.ex is undefined on the remote node.
194
end
195

196
function original_ex(s::ClusterSerializer, ex_str, remote_stktrace)
1✔
197
    local pid_str = ""
1✔
198
    try
1✔
199
        pid_str = string(" from worker ", worker_id_from_socket(s.io))
1✔
200
    catch
201
    end
202

203
    stk_str = remote_stktrace ? "Remote" : "Local"
1✔
204
    ErrorException(string("Error deserializing a remote exception", pid_str, "\n",
1✔
205
                          "Remote(original) exception of type ", ex_str, "\n",
206
                          stk_str,  " stacktrace : "))
207
end
208

209
function deserialize(s::ClusterSerializer, t::Type{<:CapturedException})
84✔
210
    ex_str = deserialize(s)
84✔
211
    local bt
×
212
    local capex
×
213
    try
84✔
214
        bt = deserialize(s)
84✔
215
    catch e
216
        throw(CompositeException([
×
217
            original_ex(s, ex_str, false),
218
            CapturedException(e, catch_backtrace())
219
        ]))
220
    end
221

222
    try
84✔
223
        capex = deserialize(s)
85✔
224
    catch e
225
        throw(CompositeException([
1✔
226
            CapturedException(original_ex(s, ex_str, true), bt),
227
            CapturedException(e, catch_backtrace())
228
        ]))
229
    end
230

231
    return CapturedException(capex, bt)
83✔
232
end
233

234
"""
235
    clear!(syms, pids=workers(); mod=Main)
236

237
Clears global bindings in modules by initializing them to `nothing`.
238
`syms` should be of type [`Symbol`](@ref) or a collection of `Symbol`s . `pids` and `mod`
239
identify the processes and the module in which global variables are to be
240
reinitialized. Only those names found to be defined under `mod` are cleared.
241

242
An exception is raised if a global constant is requested to be cleared.
243
"""
244
function clear!(syms, pids=workers(); mod=Main)
8✔
245
    @sync for p in pids
4✔
246
        @async_unwrap remotecall_wait(clear_impl!, p, syms, mod)
10✔
247
    end
×
248
end
249
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
2✔
250
clear!(sym::Symbol, pids=workers(); mod=Main) = clear!([sym], pids; mod=mod)
2✔
251
clear!(syms, pid::Int; mod=Main) = clear!(syms, [pid]; mod=mod)
2✔
252

253
clear_impl!(syms, mod::Module) = foreach(x->clear_impl!(x,mod), syms)
30✔
254
clear_impl!(sym::Symbol, mod::Module) = isdefined(mod, sym) && @eval(mod, global $sym = nothing)
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc