• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / 1560

09 Jan 2026 01:35PM UTC coverage: 76.709% (+0.05%) from 76.664%
1560

push

buildkite

web-flow
avoid some more `Core.Box` in Base and SharedArrays (#60599)

95 of 107 new or added lines in 5 files covered. (88.79%)

24 existing lines in 5 files now uncovered.

62664 of 81691 relevant lines covered (76.71%)

23290583.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.52
/stdlib/SharedArrays/src/SharedArrays.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Provide the [`SharedArray`](@ref) type. It represents an array, which is shared across multiple processes, on a single machine.
5
"""
6
module SharedArrays
7

8
using Mmap, Distributed, Random
9

10
import Base: length, size, elsize, ndims, IndexStyle, reshape, convert, deepcopy_internal,
11
             show, getindex, setindex!, fill!, similar, reduce, map!, copyto!, cconvert
12
import Base: Array
13
import Random
14
using Serialization
15
using Serialization: serialize_cycle_header, serialize_type, writetag, UNDEFREF_TAG, serialize, deserialize
16
import Serialization: serialize, deserialize
17
import Distributed: RRID, procs, remotecall_fetch
18
import Base.Filesystem: JL_O_CREAT, JL_O_RDWR, S_IRUSR, S_IWUSR
19

20
export SharedArray, SharedVector, SharedMatrix, sdata, indexpids, localindices
21

22
mutable struct SharedArray{T,N} <: DenseArray{T,N}
23
    id::RRID
24
    dims::NTuple{N,Int}
25
    pids::Vector{Int}
26
    refs::Vector
27

28
    # The segname is currently used only in the test scripts to ensure that
29
    # the shmem segment has been unlinked.
30
    segname::String
31

32
    # Fields below are not to be serialized
33
    # Local shmem map.
34
    s::Array{T,N}
35

36
    # idx of current worker's pid in the pids vector, 0 if this shared array is not mapped locally.
37
    pidx::Int
38

39
    # the local partition into the array when viewed as a single dimensional array.
40
    # this can be removed when @distributed or its equivalent supports looping on
41
    # a subset of workers.
42
    loc_subarr_1d::SubArray{T,1,Array{T,1},Tuple{UnitRange{Int}},true}
43

44
    function SharedArray{T,N}(d,p,r,sn,s) where {T,N}
134✔
45
        S = new(RRID(),d,p,r,sn,s,0,view(Array{T}(undef, ntuple(d->0,N)), 1:0))
137✔
46
        sa_refs[S.id] = WeakRef(S)
137✔
47
        S
137✔
48
    end
49
end
50

51
const sa_refs = Dict{RRID, WeakRef}()
52

53
"""
54
    SharedArray{T}(dims::NTuple; init=false, pids=Int[])
55
    SharedArray{T,N}(...)
56

57
Construct a `SharedArray` of a bits type `T` and size `dims` across the
58
processes specified by `pids` - all of which have to be on the same
59
host.  If `N` is specified by calling `SharedArray{T,N}(dims)`, then
60
`N` must match the length of `dims`.
61

62
If `pids` is left unspecified, the shared array will be mapped across all processes on the
63
current host, including the master. But, `localindices` and `indexpids` will only refer to
64
worker processes. This facilitates work distribution code to use workers for actual
65
computation with the master process acting as a driver.
66

67
If an `init` function of the type `initfn(S::SharedArray)` is specified, it is called on all
68
the participating workers.
69

70
The shared array is valid as long as a reference to the `SharedArray` object exists on the node
71
which created the mapping.
72

73
    SharedArray{T}(filename::AbstractString, dims::NTuple, [offset=0]; mode=nothing, init=false, pids=Int[])
74
    SharedArray{T,N}(...)
75

76
Construct a `SharedArray` backed by the file `filename`, with element
77
type `T` (must be a bits type) and size `dims`, across the processes
78
specified by `pids` - all of which have to be on the same host. This
79
file is mmapped into the host memory, with the following consequences:
80

81
- The array data must be represented in binary format (e.g., an ASCII
82
  format like CSV cannot be supported)
83

84
- Any changes you make to the array values (e.g., `A[3] = 0`) will
85
  also change the values on disk
86

87
If `pids` is left unspecified, the shared array will be mapped across
88
all processes on the current host, including the master. But,
89
`localindices` and `indexpids` will only refer to worker
90
processes. This facilitates work distribution code to use workers for
91
actual computation with the master process acting as a driver.
92

93
`mode` must be one of `"r"`, `"r+"`, `"w+"`, or `"a+"`, and defaults
94
to `"r+"` if the file specified by `filename` already exists, or
95
`"w+"` if not. If an `init` function of the type
96
`initfn(S::SharedArray)` is specified, it is called on all the
97
participating workers. You cannot specify an `init` function if the
98
file is not writable.
99

100
`offset` allows you to skip the specified number of bytes at the
101
beginning of the file.
102
"""
103
SharedArray
104

105
function SharedArray{T,N}(dims::Dims{N}; init=false, pids=Int[]) where {T,N}
238✔
106
    isbitstype(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))
119✔
107

108
    pids, onlocalhost = shared_pids(pids)
119✔
109

110
    local shm_seg_name = ""
119✔
111
    local s = Array{T}(undef, ntuple(d->0,N))
119✔
112
    local S
113
    local shmmem_create_pid
114
    try
119✔
115
        # On OSX, the shm_seg_name length must be <= 31 characters (including the terminating NULL character)
116
        seg_name = "/jl$(lpad(string(getpid() % 10^6), 6, "0"))$(randstring(20))"
119✔
117
        shm_seg_name = seg_name
119✔
118
        if onlocalhost
119✔
119
            shmmem_create_pid = myid()
119✔
120
            s = shm_mmap_array(T, dims, seg_name, JL_O_CREAT | JL_O_RDWR)
125✔
121
        else
122
            # The shared array is created on a remote machine
123
            shmmem_create_pid = pids[1]
×
124
            remotecall_fetch(pids[1]) do
×
125
                shm_mmap_array(T, dims, seg_name, JL_O_CREAT | JL_O_RDWR)
126
                nothing
127
            end
128
        end
129

130
        func_mapshmem = () -> shm_mmap_array(T, dims, seg_name, JL_O_RDWR)
574✔
131

132
        refs = Vector{Future}(undef, length(pids))
119✔
133
        for (i, p) in enumerate(pids)
119✔
134
            refs[i] = remotecall(func_mapshmem, p)
455✔
135
        end
455✔
136

137
        # Wait till all the workers have mapped the segment
138
        for ref in refs
119✔
139
            wait(ref)
455✔
140
        end
455✔
141

142
        # All good, immediately unlink the segment.
143
        if (prod(dims) > 0) && (sizeof(T) > 0)
119✔
144
            if onlocalhost
110✔
145
                rc = shm_unlink(seg_name)
110✔
146
            else
NEW
147
                rc = remotecall_fetch(shm_unlink, shmmem_create_pid, seg_name)
×
148
            end
149
            systemerror("Error unlinking shmem segment " * seg_name, rc != 0)
220✔
150
        end
151
        S = SharedArray{T,N}(dims, pids, refs, seg_name, s)
119✔
152
        initialize_shared_array(S, onlocalhost, init, pids)
119✔
153
        shm_seg_name = ""
119✔
154

155
    finally
156
        if !isempty(shm_seg_name)
119✔
157
            remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name)
×
158
        end
159
    end
160
    S
119✔
161
end
162

163
SharedArray{T,N}(I::Integer...; kwargs...) where {T,N} =
60✔
164
    SharedArray{T,N}(I; kwargs...)
165
SharedArray{T}(d::NTuple; kwargs...) where {T} =
18✔
166
    SharedArray{T,length(d)}(d; kwargs...)
167
SharedArray{T}(I::Integer...; kwargs...) where {T} =
×
168
    SharedArray{T,length(I)}(I; kwargs...)
169
SharedArray{T}(m::Integer; kwargs...) where {T} =
42✔
170
    SharedArray{T,1}(m; kwargs...)
171
SharedArray{T}(m::Integer, n::Integer; kwargs...) where {T} =
6✔
172
    SharedArray{T,2}(m, n; kwargs...)
173
SharedArray{T}(m::Integer, n::Integer, o::Integer; kwargs...) where {T} =
12✔
174
    SharedArray{T,3}(m, n, o; kwargs...)
175

176
function SharedArray{T,N}(filename::AbstractString, dims::NTuple{N,Int}, offset::Integer=0;
42✔
177
                          mode=nothing, init=false, pids::Vector{Int}=Int[]) where {T,N}
178
    if !isabspath(filename)
25✔
179
        throw(ArgumentError("$filename is not an absolute path; try abspath(filename)?"))
×
180
    end
181
    if !isbitstype(T)
15✔
182
        throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))
×
183
    end
184

185
    pids, onlocalhost = shared_pids(pids)
15✔
186

187
    # If not supplied, determine the appropriate mode
188
    have_file = onlocalhost ? isfile(filename) : remotecall_fetch(isfile, pids[1], filename)
15✔
189
    mode_val = mode === nothing ? (have_file ? "r+" : "w+") : mode
15✔
190
    workermode = mode_val == "w+" ? "r+" : mode_val  # workers don't truncate!
15✔
191

192
    # Ensure the file will be readable
193
    if !(mode_val in ("r", "r+", "w+", "a+"))
27✔
194
        throw(ArgumentError("mode must be readable, but $mode_val is not"))
3✔
195
    end
196
    if init !== false
12✔
197
        typeassert(init, Function)
6✔
198
        if !(mode_val in ("r+", "w+", "a+"))
12✔
NEW
199
            throw(ArgumentError("cannot initialize unwritable array (mode = $mode_val)"))
×
200
        end
201
    end
202
    if mode_val == "r" && !isfile(filename)
12✔
203
        throw(ArgumentError("file $filename does not exist, but mode $mode_val cannot create it"))
3✔
204
    end
205

206
    # Create the file if it doesn't exist, map it if it does
207
    refs = Vector{Future}(undef, length(pids))
9✔
208
    func_mmap = mode -> open(filename, mode) do io
54✔
209
        mmap(io, Array{T,N}, dims, offset; shared=true)
45✔
210
    end
211
    s = Array{T}(undef, ntuple(d->0,N))
9✔
212
    if onlocalhost
9✔
213
        s = func_mmap(mode_val)
9✔
214
        refs[1] = remotecall(pids[1]) do
9✔
215
            func_mmap(workermode)
9✔
216
        end
217
    else
218
        refs[1] = remotecall_wait(pids[1]) do
×
219
            func_mmap(mode_val)
220
        end
221
    end
222

223
    # Populate the rest of the workers
224
    for i = 2:length(pids)
9✔
225
        refs[i] = remotecall(pids[i]) do
27✔
226
            func_mmap(workermode)
27✔
227
        end
228
    end
27✔
229

230
    # Wait till all the workers have mapped the segment
231
    for ref in refs
9✔
232
        wait(ref)
36✔
233
    end
36✔
234

235
    S = SharedArray{T,N}(dims, pids, refs, filename, s)
9✔
236
    initialize_shared_array(S, onlocalhost, init, pids)
9✔
237
    S
9✔
238
end
239

240
SharedArray{T}(filename::AbstractString, dims::NTuple{N,Int}, offset::Integer=0;
6✔
241
               mode=nothing, init=false, pids::Vector{Int}=Int[]) where {T,N} =
242
    SharedArray{T,N}(filename, dims, offset; mode=mode, init=init, pids=pids)
243

244
function initialize_shared_array(S, onlocalhost, init, pids)
57✔
245
    if onlocalhost
128✔
246
        init_loc_flds(S)
128✔
247
    else
248
        S.pidx = 0
×
249
    end
250

251
    # if present, init function is called on each of the parts
252
    if isa(init, Function)
128✔
253
        @sync begin
57✔
254
            for p in pids
57✔
255
                @async remotecall_wait(init, p, S)
420✔
256
            end
210✔
257
        end
258
    end
259

260
    finalizer(finalize_refs, S)
128✔
261
    S
128✔
262
end
263

264
function finalize_refs(S::SharedArray{T,N}) where T where N
107✔
265
    if length(S.pids) > 0
107✔
266
        for r in S.refs
107✔
267
            finalize(r)
407✔
268
        end
514✔
269
        empty!(S.pids)
407✔
270
        empty!(S.refs)
107✔
271
        init_loc_flds(S)
107✔
272
        S.s = Array{T}(undef, ntuple(d->0,N))
107✔
273
        delete!(sa_refs, S.id)
107✔
274
    end
275
    S
107✔
276
end
277

278
"""
279
    SharedVector
280

281
A one-dimensional [`SharedArray`](@ref).
282
"""
283
const SharedVector{T} = SharedArray{T,1}
284
"""
285
    SharedMatrix
286

287
A two-dimensional [`SharedArray`](@ref).
288
"""
289
const SharedMatrix{T} = SharedArray{T,2}
290

291
SharedVector(A::Vector) = SharedArray(A)
3✔
292
SharedMatrix(A::Matrix) = SharedArray(A)
3✔
293

294
size(S::SharedArray) = S.dims
179,298✔
295
elsize(::Type{SharedArray{T,N}}) where {T,N} = elsize(Array{T,N}) # aka fieldtype(T, :s)
12✔
296
IndexStyle(::Type{<:SharedArray}) = IndexLinear()
303✔
297

298
function local_array_by_id(refid)
498✔
299
    if isa(refid, Future)
498✔
300
        refid = remoteref_id(refid)
36✔
301
    end
302
    fetch(channel_from_id(refid))
498✔
303
end
304

305
function reshape(a::SharedArray{T}, dims::NTuple{N,Int}) where {T,N}
12✔
306
    if length(a) != prod(dims)
12✔
307
        throw(DimensionMismatch("dimensions must be consistent with array size"))
3✔
308
    end
309
    refs = Vector{Future}(undef, length(a.pids))
9✔
310
    for (i, p) in enumerate(a.pids)
15✔
311
        refs[i] = remotecall(p, a.refs[i], dims) do r, d
36✔
312
            reshape(local_array_by_id(r), d)
36✔
313
        end
314
    end
63✔
315

316
    A = SharedArray{T,N}(dims, a.pids, refs, a.segname, reshape(a.s, dims))
9✔
317
    init_loc_flds(A)
9✔
318
    A
9✔
319
end
320

321
"""
322
    procs(S::SharedArray)
323

324
Get the vector of processes mapping the shared array.
325
"""
326
procs(S::SharedArray) = S.pids
66✔
327

328
"""
329
    indexpids(S::SharedArray)
330

331
Return the current worker's index in the list of workers
332
mapping the `SharedArray` (i.e. in the same list returned by `procs(S)`), or
333
0 if the `SharedArray` is not mapped locally.
334
"""
335
indexpids(S::SharedArray) = S.pidx
×
336

337
"""
338
    sdata(S::SharedArray)
339

340
Return the actual `Array` object backing `S`.
341
"""
342
sdata(S::SharedArray) = S.s
30✔
343
sdata(A::AbstractArray) = A
3✔
344

345
"""
346
    localindices(S::SharedArray)
347

348
Return a range describing the "default" indices to be handled by the
349
current process.  This range should be interpreted in the sense of
350
linear indexing, i.e., as a sub-range of `1:length(S)`.  In
351
multi-process contexts, returns an empty range in the parent process
352
(or any process for which [`indexpids`](@ref) returns 0).
353

354
It's worth emphasizing that `localindices` exists purely as a
355
convenience, and you can partition work on the array among workers any
356
way you wish. For a `SharedArray`, all indices should be equally fast
357
for each worker process.
358
"""
359
localindices(S::SharedArray) = S.pidx > 0 ? range_1dim(S, S.pidx) : 1:0
42✔
360

361
cconvert(::Type{Ptr{T}}, S::SharedArray{T}) where {T} = cconvert(Ptr{T}, sdata(S))
6✔
362
cconvert(::Type{Ptr{T}}, S::SharedArray   ) where {T} = cconvert(Ptr{T}, sdata(S))
×
363

364
function SharedArray(A::Array)
6✔
365
    S = SharedArray{eltype(A),ndims(A)}(size(A))
24✔
366
    copyto!(S, A)
24✔
367
end
368
function SharedArray{T}(A::Array) where T
×
369
    S = SharedArray{T,ndims(A)}(size(A))
×
370
    copyto!(S, A)
×
371
end
372
function SharedArray{TS,N}(A::Array{TA,N}) where {TS,TA,N}
×
373
    S = SharedArray{TS,ndims(A)}(size(A))
×
374
    copyto!(S, A)
×
375
end
376

377
convert(T::Type{<:SharedArray}, a::Array) = T(a)::T
12✔
378

379
function deepcopy_internal(S::SharedArray, stackdict::IdDict)
3✔
380
    haskey(stackdict, S) && return stackdict[S]
3✔
381
    R = SharedArray{eltype(S),ndims(S)}(size(S); pids = S.pids)
3✔
382
    copyto!(sdata(R), sdata(S))
6✔
383
    stackdict[S] = R
3✔
384
    return R
3✔
385
end
386

387
function shared_pids(pids)
134✔
388
    if isempty(pids)
134✔
389
        # only use workers on the current host
390
        pids = procs(myid())
121✔
391
        if length(pids) > 1
242✔
392
            pids = filter(!=(1), pids)
121✔
393
        end
394

395
        onlocalhost = true
121✔
396
    else
397
        if !check_same_host(pids)
13✔
398
            throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
×
399
        end
400

401
        onlocalhost = myid() in procs(pids[1])
13✔
402
    end
403
    pids, onlocalhost
134✔
404
end
405

406
function range_1dim(S::SharedArray, pidx)
407
    l = length(S)
504✔
408
    nw = length(S.pids)
504✔
409
    partlen = div(l, nw)
504✔
410

411
    if l < nw
504✔
412
        if pidx <= l
15✔
413
            return pidx:pidx
3✔
414
        else
415
            return 1:0
12✔
416
        end
417
    elseif pidx == nw
489✔
418
        return (((pidx-1) * partlen) + 1):l
137✔
419
    else
420
        return (((pidx-1) * partlen) + 1):(pidx*partlen)
352✔
421
    end
422
end
423

424
sub_1dim(S::SharedArray, pidx) = view(S.s, range_1dim(S, pidx))
462✔
425

426
function init_loc_flds(S::SharedArray{T,N}, empty_local=false) where T where N
697✔
427
    if myid() in S.pids
2,085✔
428
        S.pidx = findfirst(isequal(myid()), S.pids)
924✔
429
        S.s = local_array_by_id(S.refs[S.pidx])
462✔
430
        S.loc_subarr_1d = sub_1dim(S, S.pidx)
462✔
431
    else
432
        S.pidx = 0
235✔
433
        if empty_local
235✔
434
            S.s = Array{T}(undef, ntuple(d->0,N))
3✔
435
        end
436
        S.loc_subarr_1d = view(Array{T}(undef, ntuple(d->0,N)), 1:0)
235✔
437
    end
438
end
439

440

441
# Don't serialize s (it is the complete array) and
442
# pidx, which is relevant to the current process only
443
function serialize(s::AbstractSerializer, S::SharedArray)
468✔
444
    serialize_cycle_header(s, S) && return
933✔
445

446
    destpid = worker_id_from_socket(s.io)
465✔
447
    if S.id.whence == destpid
465✔
448
        # The shared array was created from destpid, hence a reference to it
449
        # must be available at destpid.
450
        serialize(s, true)
12✔
451
        serialize(s, S.id.whence)
12✔
452
        serialize(s, S.id.id)
12✔
453
        return
12✔
454
    end
455
    serialize(s, false)
453✔
456
    for n in fieldnames(SharedArray)
453✔
457
        if n in [:s, :pidx, :loc_subarr_1d]
11,778✔
458
            writetag(s.io, UNDEFREF_TAG)
1,359✔
459
        elseif n === :refs
2,265✔
460
            v = getfield(S, n)
453✔
461
            if isa(v[1], Future)
453✔
462
                # convert to ids to avoid distributed GC overhead
463
                ids = [remoteref_id(x) for x in v]
426✔
464
                serialize(s, ids)
426✔
465
            else
466
                serialize(s, v)
27✔
467
            end
468
        else
469
            serialize(s, getfield(S, n))
1,812✔
470
        end
471
    end
3,624✔
472
end
473

474
function deserialize(s::AbstractSerializer, t::Type{<:SharedArray})
465✔
475
    ref_exists = deserialize(s)
465✔
476
    if ref_exists
465✔
477
        sref = sa_refs[RRID(deserialize(s), deserialize(s))]
12✔
478
        if sref.value !== nothing
12✔
479
            return sref.value
12✔
480
        end
481
        error("Expected reference to shared array instance not found")
×
482
    end
483

484
    S = invoke(deserialize, Tuple{AbstractSerializer,DataType}, s, t)
453✔
485
    init_loc_flds(S, true)
453✔
486
    return S
453✔
487
end
488

489
function show(io::IO, S::SharedArray)
6✔
490
    if length(S.s) > 0
6✔
491
        invoke(show, Tuple{IO,DenseArray}, io, S)
3✔
492
    else
493
        show(io, remotecall_fetch(sharr->sharr.s, S.pids[1], S))
6✔
494
    end
495
end
496

497
function show(io::IO, mime::MIME"text/plain", S::SharedArray)
3✔
498
    if length(S.s) > 0
3✔
499
        invoke(show, Tuple{IO,MIME"text/plain",DenseArray}, io, MIME"text/plain"(), S)
×
500
    else
501
        # retrieve from the first worker mapping the array.
502
        summary(io, S); println(io, ":")
3✔
503
        Base.print_array(io, remotecall_fetch(sharr->sharr.s, S.pids[1], S))
6✔
504
    end
505
end
506

507
Array(S::SharedArray) = S.s
3✔
508

509
# pass through getindex and setindex! - unlike DArrays, these always work on the complete array
510
Base.@propagate_inbounds getindex(S::SharedArray, i::Real) = getindex(S.s, i)
179,094✔
511

512
Base.@propagate_inbounds setindex!(S::SharedArray, x, i::Real) = setindex!(S.s, x, i)
24,339✔
513

514
function fill!(S::SharedArray, v)
15✔
515
    vT = convert(eltype(S), v)
15✔
516
    f = S->fill!(S.loc_subarr_1d, vT)
63✔
517
    @sync for p in procs(S)
15✔
518
        @async remotecall_wait(f, p, S)
96✔
519
    end
520
    return S
15✔
521
end
522

523
function Random.rand!(S::SharedArray{T}) where T
3✔
524
    f = S->map!(x -> rand(T), S.loc_subarr_1d, S.loc_subarr_1d)
24,015✔
525
    @sync for p in procs(S)
3✔
526
        @async remotecall_wait(f, p, S)
24✔
527
    end
528
    return S
3✔
529
end
530

531
function Random.randn!(S::SharedArray)
3✔
532
    f = S->map!(x -> randn(), S.loc_subarr_1d, S.loc_subarr_1d)
24,015✔
533
    @sync for p in procs(S)
3✔
534
        @async remotecall_wait(f, p, S)
24✔
535
    end
536
    return S
3✔
537
end
538

539
# convenience constructors
540
function shmem_fill(v, dims; kwargs...)
21✔
541
    SharedArray{typeof(v),length(dims)}(dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...)
93✔
542
end
543
shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...)
12✔
544

545
# rand variant with range
546
function shmem_rand(TR::Union{DataType, UnitRange}, dims; kwargs...)
24✔
547
    if isa(TR, UnitRange)
24✔
548
        SharedArray{Int,length(dims)}(dims; init = S -> map!(x -> rand(TR), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
48,015✔
549
    else
550
        SharedArray{TR,length(dims)}(dims; init = S -> map!(x -> rand(TR), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
168,105✔
551
    end
552
end
553
shmem_rand(TR::Union{DataType, UnitRange}, i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...)
×
554
shmem_rand(TR::Union{DataType, UnitRange}, I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...)
×
555

556
shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...)
42✔
557
shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...)
×
558

559
function shmem_randn(dims; kwargs...)
×
560
    SharedArray{Float64,length(dims)}(dims; init = S-> map!(x -> randn(), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
×
561
end
562
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)
×
563

564
similar(S::SharedArray, T::Type, dims::Dims) = similar(S.s, T, dims)
24✔
565
similar(S::SharedArray, T::Type) = similar(S.s, T, size(S))
3✔
566
similar(S::SharedArray, dims::Dims) = similar(S.s, eltype(S), dims)
3✔
567
similar(S::SharedArray) = similar(S.s, eltype(S), size(S))
×
568

569
reduce(f, S::SharedArray) =
6✔
570
    mapreduce(fetch, f, Any[ @spawnat p reduce(f, S.loc_subarr_1d) for p in procs(S) ])
571

572
reduce(::typeof(vcat), S::SharedVector) = invoke(reduce, Tuple{Any,SharedArray}, vcat, S)
×
573
reduce(::typeof(hcat), S::SharedVector) = invoke(reduce, Tuple{Any,SharedArray}, hcat, S)
×
574

575
function map!(f, S::SharedArray, Q::SharedArray)
3✔
576
    if (S !== Q) && (procs(S) != procs(Q) || localindices(S) != localindices(Q))
3✔
577
        throw(ArgumentError("incompatible source and destination arguments"))
×
578
    end
579
    @sync for p in procs(S)
3✔
580
        @spawnat p begin
6✔
581
            for idx in localindices(S)
9✔
582
                S.s[idx] = f(Q.s[idx])
300✔
583
            end
300✔
584
        end
585
    end
586
    return S
3✔
587
end
588

589
copyto!(S::SharedArray, A::Array) = (copyto!(S.s, A); S)
30✔
590

591
function copyto!(S::SharedArray, R::SharedArray)
3✔
592
    length(S) == length(R) || throw(BoundsError())
3✔
593
    ps = intersect(procs(S), procs(R))
3✔
594
    isempty(ps) && throw(ArgumentError("source and destination arrays don't share any process"))
3✔
595
    l = length(S)
3✔
596
    length(ps) > l && (ps = ps[1:l])
3✔
597
    nw = length(ps)
3✔
598
    partlen = div(l, nw)
3✔
599

600
    @sync for i = 1:nw
12✔
601
        p = ps[i]
12✔
602
        idx = i < nw ?  ((i-1)*partlen+1:i*partlen) : ((i-1)*partlen+1:l)
12✔
603
        @spawnat p begin
12✔
604
            S.s[idx] = R.s[idx]
12✔
605
        end
606
    end
607

608
    return S
3✔
609
end
610

611
function print_shmem_limits(slen)
×
612
    try
×
613
        if Sys.islinux()
×
614
            pfx = "kernel"
×
615
        elseif Sys.isapple()
×
616
            pfx = "kern.sysv"
×
617
        elseif Sys.KERNEL === :FreeBSD || Sys.KERNEL === :DragonFly
×
618
            pfx = "kern.ipc"
×
619
        elseif Sys.KERNEL === :OpenBSD
×
620
            pfx = "kern.shminfo"
×
621
        else
622
            # seems NetBSD does not have *.shmall
623
            return
×
624
        end
625

626
        shmmax_MB = div(parse(Int, split(read(`sysctl $(pfx).shmmax`, String))[end]), 1024*1024)
×
627
        page_size = parse(Int, split(read(`getconf PAGE_SIZE`, String))[end])
×
628
        shmall_MB = div(parse(Int, split(read(`sysctl $(pfx).shmall`, String))[end]) * page_size, 1024*1024)
×
629

630
        println("System max size of single shmem segment(MB) : ", shmmax_MB,
×
631
            "\nSystem max size of all shmem segments(MB) : ", shmall_MB,
632
            "\nRequested size(MB) : ", div(slen, 1024*1024),
633
            "\nPlease ensure requested size is within system limits.",
634
            "\nIf not, increase system limits and try again."
635
        )
636
    catch e
637
        nothing # Ignore any errors in this
×
638
    end
639
end
640

641
# utilities
642
function shm_mmap_array(T, dims, shm_seg_name, mode)
544✔
643
    local s = nothing
574✔
644
    local A = nothing
574✔
645

646
    if (prod(dims) == 0) || (sizeof(T) == 0)
1,133✔
647
        return Array{T}(undef, dims)
45✔
648
    end
649

650
    try
529✔
651
        A = _shm_mmap_array(T, dims, shm_seg_name, mode)
529✔
652
    catch
653
        print_shmem_limits(prod(dims)*sizeof(T))
×
654
        rethrow()
529✔
655

656
    finally
657
        if s !== nothing
529✔
658
            close(s)
×
659
        end
660
    end
661
    A
529✔
662
end
663

664

665
# platform-specific code
666

667
if Sys.iswindows()
668
function _shm_mmap_array(T, dims, shm_seg_name, mode)
174✔
669
    readonly = !((mode & JL_O_RDWR) == JL_O_RDWR)
174✔
670
    create = (mode & JL_O_CREAT) == JL_O_CREAT
174✔
671
    s = Mmap.Anonymous(shm_seg_name, readonly, create)
174✔
672
    mmap(s, Array{T,length(dims)}, dims, zero(Int64))
174✔
673
end
674

675
# no-op in windows
676
shm_unlink(shm_seg_name) = 0
×
677

678
else # !windows
679
function _shm_mmap_array(T, dims, shm_seg_name, mode)
355✔
680
    fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR)
355✔
681
    systemerror("shm_open() failed for " * shm_seg_name, fd_mem < 0)
355✔
682

683
    s = fdio(fd_mem, true)
355✔
684

685
    # On OSX, ftruncate must to used to set size of segment, just lseek does not work.
686
    # and only at creation time
687
    if (mode & JL_O_CREAT) == JL_O_CREAT
355✔
688
        rc = ccall(:jl_ftruncate, Cint, (Cint, Int64), fd_mem, prod(dims)*sizeof(T))
74✔
689
        systemerror("ftruncate() failed for shm segment " * shm_seg_name, rc != 0)
74✔
690
    end
691

692
    mmap(s, Array{T,length(dims)}, dims, zero(Int64); grow=false)
355✔
693
end
694

695
shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Cstring,), shm_seg_name)
74✔
696
function shm_open(shm_seg_name, oflags, permissions)
697
    # On macOS, `shm_open()` is a variadic function, so to properly match
698
    # calling ABI, we must declare our arguments as variadic as well.
699
    @static if Sys.isapple()
355✔
700
        return ccall(:shm_open, Cint, (Cstring, Cint, Base.Cmode_t...), shm_seg_name, oflags, permissions)
174✔
701
    else
702
        return ccall(:shm_open, Cint, (Cstring, Cint, Base.Cmode_t), shm_seg_name, oflags, permissions)
181✔
703
    end
704
end
705
end # os-test
706

707
end # module
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc