• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37433

pending completion
#37433

push

local

web-flow
Merge pull request #48513 from JuliaLang/jn/extend-once

ensure extension triggers are only run by the package that satified them

60 of 60 new or added lines in 1 file covered. (100.0%)

72324 of 82360 relevant lines covered (87.81%)

31376331.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.27
/stdlib/SharedArrays/src/SharedArrays.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
Provide the [`SharedArray`](@ref) type. It represents an array, which is shared across multiple processes, on a single machine.
5
"""
6
module SharedArrays
7

8
using Mmap, Distributed, Random
9

10
import Base: length, size, elsize, ndims, IndexStyle, reshape, convert, deepcopy_internal,
11
             show, getindex, setindex!, fill!, similar, reduce, map!, copyto!, unsafe_convert
12
import Random
13
using Serialization
14
using Serialization: serialize_cycle_header, serialize_type, writetag, UNDEFREF_TAG, serialize, deserialize
15
import Serialization: serialize, deserialize
16
import Distributed: RRID, procs, remotecall_fetch
17
import Base.Filesystem: JL_O_CREAT, JL_O_RDWR, S_IRUSR, S_IWUSR
18

19
export SharedArray, SharedVector, SharedMatrix, sdata, indexpids, localindices
20

21
mutable struct SharedArray{T,N} <: DenseArray{T,N}
22
    id::RRID
23
    dims::NTuple{N,Int}
24
    pids::Vector{Int}
25
    refs::Vector
26

27
    # The segname is currently used only in the test scripts to ensure that
28
    # the shmem segment has been unlinked.
29
    segname::String
30

31
    # Fields below are not to be serialized
32
    # Local shmem map.
33
    s::Array{T,N}
34

35
    # idx of current worker's pid in the pids vector, 0 if this shared array is not mapped locally.
36
    pidx::Int
37

38
    # the local partition into the array when viewed as a single dimensional array.
39
    # this can be removed when @distributed or its equivalent supports looping on
40
    # a subset of workers.
41
    loc_subarr_1d::SubArray{T,1,Array{T,1},Tuple{UnitRange{Int}},true}
42

43
    function SharedArray{T,N}(d,p,r,sn,s) where {T,N}
47✔
44
        S = new(RRID(),d,p,r,sn,s,0,view(Array{T}(undef, ntuple(d->0,N)), 1:0))
146✔
45
        sa_refs[S.id] = WeakRef(S)
47✔
46
        S
47✔
47
    end
48
end
49

50
const sa_refs = Dict{RRID, WeakRef}()
51

52
"""
53
    SharedArray{T}(dims::NTuple; init=false, pids=Int[])
54
    SharedArray{T,N}(...)
55

56
Construct a `SharedArray` of a bits type `T` and size `dims` across the
57
processes specified by `pids` - all of which have to be on the same
58
host.  If `N` is specified by calling `SharedArray{T,N}(dims)`, then
59
`N` must match the length of `dims`.
60

61
If `pids` is left unspecified, the shared array will be mapped across all processes on the
62
current host, including the master. But, `localindices` and `indexpids` will only refer to
63
worker processes. This facilitates work distribution code to use workers for actual
64
computation with the master process acting as a driver.
65

66
If an `init` function of the type `initfn(S::SharedArray)` is specified, it is called on all
67
the participating workers.
68

69
The shared array is valid as long as a reference to the `SharedArray` object exists on the node
70
which created the mapping.
71

72
    SharedArray{T}(filename::AbstractString, dims::NTuple, [offset=0]; mode=nothing, init=false, pids=Int[])
73
    SharedArray{T,N}(...)
74

75
Construct a `SharedArray` backed by the file `filename`, with element
76
type `T` (must be a bits type) and size `dims`, across the processes
77
specified by `pids` - all of which have to be on the same host. This
78
file is mmapped into the host memory, with the following consequences:
79

80
- The array data must be represented in binary format (e.g., an ASCII
81
  format like CSV cannot be supported)
82

83
- Any changes you make to the array values (e.g., `A[3] = 0`) will
84
  also change the values on disk
85

86
If `pids` is left unspecified, the shared array will be mapped across
87
all processes on the current host, including the master. But,
88
`localindices` and `indexpids` will only refer to worker
89
processes. This facilitates work distribution code to use workers for
90
actual computation with the master process acting as a driver.
91

92
`mode` must be one of `"r"`, `"r+"`, `"w+"`, or `"a+"`, and defaults
93
to `"r+"` if the file specified by `filename` already exists, or
94
`"w+"` if not. If an `init` function of the type
95
`initfn(S::SharedArray)` is specified, it is called on all the
96
participating workers. You cannot specify an `init` function if the
97
file is not writable.
98

99
`offset` allows you to skip the specified number of bytes at the
100
beginning of the file.
101
"""
102
SharedArray
103

104
function SharedArray{T,N}(dims::Dims{N}; init=false, pids=Int[]) where {T,N}
82✔
105
    isbitstype(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))
41✔
106

107
    pids, onlocalhost = shared_pids(pids)
41✔
108

109
    local shm_seg_name = ""
41✔
110
    local s = Array{T}(undef, ntuple(d->0,N))
129✔
111
    local S
×
112
    local shmmem_create_pid
×
113
    try
41✔
114
        # On OSX, the shm_seg_name length must be <= 31 characters (including the terminating NULL character)
115
        shm_seg_name = "/jl$(lpad(string(getpid() % 10^6), 6, "0"))$(randstring(20))"
41✔
116
        if onlocalhost
41✔
117
            shmmem_create_pid = myid()
41✔
118
            s = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR)
41✔
119
        else
120
            # The shared array is created on a remote machine
121
            shmmem_create_pid = pids[1]
×
122
            remotecall_fetch(pids[1]) do
×
123
                shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR)
124
                nothing
125
            end
126
        end
127

128
        func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR)
196✔
129

130
        refs = Vector{Future}(undef, length(pids))
41✔
131
        for (i, p) in enumerate(pids)
41✔
132
            refs[i] = remotecall(func_mapshmem, p)
155✔
133
        end
155✔
134

135
        # Wait till all the workers have mapped the segment
136
        for ref in refs
41✔
137
            wait(ref)
155✔
138
        end
155✔
139

140
        # All good, immediately unlink the segment.
141
        if (prod(dims) > 0) && (sizeof(T) > 0)
41✔
142
            if onlocalhost
38✔
143
                rc = shm_unlink(shm_seg_name)
38✔
144
            else
145
                rc = remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name)
×
146
            end
147
            systemerror("Error unlinking shmem segment " * shm_seg_name, rc != 0)
76✔
148
        end
149
        S = SharedArray{T,N}(dims, pids, refs, shm_seg_name, s)
41✔
150
        initialize_shared_array(S, onlocalhost, init, pids)
41✔
151
        shm_seg_name = ""
41✔
152

153
    finally
154
        if !isempty(shm_seg_name)
41✔
155
            remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name)
×
156
        end
157
    end
158
    S
41✔
159
end
160

161
SharedArray{T,N}(I::Integer...; kwargs...) where {T,N} =
20✔
162
    SharedArray{T,N}(I; kwargs...)
163
SharedArray{T}(d::NTuple; kwargs...) where {T} =
6✔
164
    SharedArray{T,length(d)}(d; kwargs...)
165
SharedArray{T}(I::Integer...; kwargs...) where {T} =
×
166
    SharedArray{T,length(I)}(I; kwargs...)
167
SharedArray{T}(m::Integer; kwargs...) where {T} =
14✔
168
    SharedArray{T,1}(m; kwargs...)
169
SharedArray{T}(m::Integer, n::Integer; kwargs...) where {T} =
2✔
170
    SharedArray{T,2}(m, n; kwargs...)
171
SharedArray{T}(m::Integer, n::Integer, o::Integer; kwargs...) where {T} =
4✔
172
    SharedArray{T,3}(m, n, o; kwargs...)
173

174
function SharedArray{T,N}(filename::AbstractString, dims::NTuple{N,Int}, offset::Integer=0;
14✔
175
                          mode=nothing, init=false, pids::Vector{Int}=Int[]) where {T,N}
176
    if !isabspath(filename)
5✔
177
        throw(ArgumentError("$filename is not an absolute path; try abspath(filename)?"))
×
178
    end
179
    if !isbitstype(T)
5✔
180
        throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))
×
181
    end
182

183
    pids, onlocalhost = shared_pids(pids)
5✔
184

185
    # If not supplied, determine the appropriate mode
186
    have_file = onlocalhost ? isfile(filename) : remotecall_fetch(isfile, pids[1], filename)
5✔
187
    if mode === nothing
5✔
188
        mode = have_file ? "r+" : "w+"
2✔
189
    end
190
    workermode = mode == "w+" ? "r+" : mode  # workers don't truncate!
9✔
191

192
    # Ensure the file will be readable
193
    if !(mode in ("r", "r+", "w+", "a+"))
10✔
194
        throw(ArgumentError("mode must be readable, but $mode is not"))
1✔
195
    end
196
    if init !== false
4✔
197
        typeassert(init, Function)
2✔
198
        if !(mode in ("r+", "w+", "a+"))
4✔
199
            throw(ArgumentError("cannot initialize unwritable array (mode = $mode)"))
×
200
        end
201
    end
202
    if mode == "r" && !isfile(filename)
4✔
203
        throw(ArgumentError("file $filename does not exist, but mode $mode cannot create it"))
1✔
204
    end
205

206
    # Create the file if it doesn't exist, map it if it does
207
    refs = Vector{Future}(undef, length(pids))
3✔
208
    func_mmap = mode -> open(filename, mode) do io
18✔
209
        mmap(io, Array{T,N}, dims, offset; shared=true)
15✔
210
    end
211
    s = Array{T}(undef, ntuple(d->0,N))
9✔
212
    if onlocalhost
3✔
213
        s = func_mmap(mode)
3✔
214
        refs[1] = remotecall(pids[1]) do
3✔
215
            func_mmap(workermode)
3✔
216
        end
217
    else
218
        refs[1] = remotecall_wait(pids[1]) do
×
219
            func_mmap(mode)
220
        end
221
    end
222

223
    # Populate the rest of the workers
224
    for i = 2:length(pids)
3✔
225
        refs[i] = remotecall(pids[i]) do
9✔
226
            func_mmap(workermode)
9✔
227
        end
228
    end
9✔
229

230
    # Wait till all the workers have mapped the segment
231
    for ref in refs
3✔
232
        wait(ref)
12✔
233
    end
12✔
234

235
    S = SharedArray{T,N}(dims, pids, refs, filename, s)
3✔
236
    initialize_shared_array(S, onlocalhost, init, pids)
3✔
237
    S
3✔
238
end
239

240
SharedArray{T}(filename::AbstractString, dims::NTuple{N,Int}, offset::Integer=0;
241
               mode=nothing, init=false, pids::Vector{Int}=Int[]) where {T,N} =
2✔
242
    SharedArray{T,N}(filename, dims, offset; mode=mode, init=init, pids=pids)
243

244
function initialize_shared_array(S, onlocalhost, init, pids)
44✔
245
    if onlocalhost
44✔
246
        init_loc_flds(S)
44✔
247
    else
248
        S.pidx = 0
×
249
    end
250

251
    # if present, init function is called on each of the parts
252
    if isa(init, Function)
44✔
253
        @sync begin
38✔
254
            for p in pids
19✔
255
                @async remotecall_wait(init, p, S)
70✔
256
            end
89✔
257
        end
258
    end
259

260
    finalizer(finalize_refs, S)
44✔
261
    S
44✔
262
end
263

264
function finalize_refs(S::SharedArray{T,N}) where T where N
36✔
265
    if length(S.pids) > 0
36✔
266
        for r in S.refs
36✔
267
            finalize(r)
135✔
268
        end
171✔
269
        empty!(S.pids)
36✔
270
        empty!(S.refs)
36✔
271
        init_loc_flds(S)
36✔
272
        S.s = Array{T}(undef, ntuple(d->0,N))
116✔
273
        delete!(sa_refs, S.id)
36✔
274
    end
275
    S
36✔
276
end
277

278
"""
279
    SharedVector
280

281
A one-dimensional [`SharedArray`](@ref).
282
"""
283
const SharedVector{T} = SharedArray{T,1}
284
"""
285
    SharedMatrix
286

287
A two-dimensional [`SharedArray`](@ref).
288
"""
289
const SharedMatrix{T} = SharedArray{T,2}
290

291
SharedVector(A::Vector) = SharedArray(A)
1✔
292
SharedMatrix(A::Matrix) = SharedArray(A)
1✔
293

294
size(S::SharedArray) = S.dims
364✔
295
elsize(::Type{SharedArray{T,N}}) where {T,N} = elsize(Array{T,N}) # aka fieldtype(T, :s)
4✔
296
IndexStyle(::Type{<:SharedArray}) = IndexLinear()
59✔
297

298
function local_array_by_id(refid)
166✔
299
    if isa(refid, Future)
166✔
300
        refid = remoteref_id(refid)
12✔
301
    end
302
    fetch(channel_from_id(refid))
166✔
303
end
304

305
function reshape(a::SharedArray{T}, dims::NTuple{N,Int}) where {T,N}
4✔
306
    if length(a) != prod(dims)
4✔
307
        throw(DimensionMismatch("dimensions must be consistent with array size"))
1✔
308
    end
309
    refs = Vector{Future}(undef, length(a.pids))
3✔
310
    for (i, p) in enumerate(a.pids)
6✔
311
        refs[i] = remotecall(p, a.refs[i], dims) do r, d
12✔
312
            reshape(local_array_by_id(r), d)
12✔
313
        end
314
    end
21✔
315

316
    A = SharedArray{T,N}(dims, a.pids, refs, a.segname, reshape(a.s, dims))
3✔
317
    init_loc_flds(A)
3✔
318
    A
3✔
319
end
320

321
"""
322
    procs(S::SharedArray)
323

324
Get the vector of processes mapping the shared array.
325
"""
326
procs(S::SharedArray) = S.pids
23✔
327

328
"""
329
    indexpids(S::SharedArray)
330

331
Return the current worker's index in the list of workers
332
mapping the `SharedArray` (i.e. in the same list returned by `procs(S)`), or
333
0 if the `SharedArray` is not mapped locally.
334
"""
335
indexpids(S::SharedArray) = S.pidx
×
336

337
"""
338
    sdata(S::SharedArray)
339

340
Return the actual `Array` object backing `S`.
341
"""
342
sdata(S::SharedArray) = S.s
11✔
343
sdata(A::AbstractArray) = A
1✔
344

345
"""
346
    localindices(S::SharedArray)
347

348
Return a range describing the "default" indices to be handled by the
349
current process.  This range should be interpreted in the sense of
350
linear indexing, i.e., as a sub-range of `1:length(S)`.  In
351
multi-process contexts, returns an empty range in the parent process
352
(or any process for which [`indexpids`](@ref) returns 0).
353

354
It's worth emphasizing that `localindices` exists purely as a
355
convenience, and you can partition work on the array among workers any
356
way you wish. For a `SharedArray`, all indices should be equally fast
357
for each worker process.
358
"""
359
localindices(S::SharedArray) = S.pidx > 0 ? range_1dim(S, S.pidx) : 1:0
14✔
360

361
unsafe_convert(::Type{Ptr{T}}, S::SharedArray{T}) where {T} = unsafe_convert(Ptr{T}, sdata(S))
2✔
362
unsafe_convert(::Type{Ptr{T}}, S::SharedArray   ) where {T} = unsafe_convert(Ptr{T}, sdata(S))
×
363

364
function SharedArray(A::Array)
8✔
365
    S = SharedArray{eltype(A),ndims(A)}(size(A))
8✔
366
    copyto!(S, A)
8✔
367
end
368
function SharedArray{T}(A::Array) where T
×
369
    S = SharedArray{T,ndims(A)}(size(A))
×
370
    copyto!(S, A)
×
371
end
372
function SharedArray{TS,N}(A::Array{TA,N}) where {TS,TA,N}
×
373
    S = SharedArray{TS,ndims(A)}(size(A))
×
374
    copyto!(S, A)
×
375
end
376

377
convert(T::Type{<:SharedArray}, a::Array) = T(a)::T
4✔
378

379
function deepcopy_internal(S::SharedArray, stackdict::IdDict)
1✔
380
    haskey(stackdict, S) && return stackdict[S]
1✔
381
    R = SharedArray{eltype(S),ndims(S)}(size(S); pids = S.pids)
1✔
382
    copyto!(sdata(R), sdata(S))
1✔
383
    stackdict[S] = R
1✔
384
    return R
1✔
385
end
386

387
function shared_pids(pids)
46✔
388
    if isempty(pids)
46✔
389
        # only use workers on the current host
390
        pids = procs(myid())
41✔
391
        if length(pids) > 1
82✔
392
            pids = filter(!=(1), pids)
41✔
393
        end
394

395
        onlocalhost = true
41✔
396
    else
397
        if !check_same_host(pids)
5✔
398
            throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
×
399
        end
400

401
        onlocalhost = myid() in procs(pids[1])
5✔
402
    end
403
    pids, onlocalhost
46✔
404
end
405

406
function range_1dim(S::SharedArray, pidx)
168✔
407
    l = length(S)
168✔
408
    nw = length(S.pids)
168✔
409
    partlen = div(l, nw)
168✔
410

411
    if l < nw
168✔
412
        if pidx <= l
5✔
413
            return pidx:pidx
1✔
414
        else
415
            return 1:0
4✔
416
        end
417
    elseif pidx == nw
163✔
418
        return (((pidx-1) * partlen) + 1):l
45✔
419
    else
420
        return (((pidx-1) * partlen) + 1):(pidx*partlen)
118✔
421
    end
422
end
423

424
sub_1dim(S::SharedArray, pidx) = view(S.s, range_1dim(S, pidx))
154✔
425

426
function init_loc_flds(S::SharedArray{T,N}, empty_local=false) where T where N
317✔
427
    if myid() in S.pids
702✔
428
        S.pidx = findfirst(isequal(myid()), S.pids)
308✔
429
        S.s = local_array_by_id(S.refs[S.pidx])
154✔
430
        S.loc_subarr_1d = sub_1dim(S, S.pidx)
154✔
431
    else
432
        S.pidx = 0
80✔
433
        if empty_local
80✔
434
            S.s = Array{T}(undef, ntuple(d->0,N))
3✔
435
        end
436
        S.loc_subarr_1d = view(Array{T}(undef, ntuple(d->0,N)), 1:0)
255✔
437
    end
438
end
439

440

441
# Don't serialize s (it is the complete array) and
442
# pidx, which is relevant to the current process only
443
function serialize(s::AbstractSerializer, S::SharedArray)
156✔
444
    serialize_cycle_header(s, S) && return
311✔
445

446
    destpid = worker_id_from_socket(s.io)
155✔
447
    if S.id.whence == destpid
155✔
448
        # The shared array was created from destpid, hence a reference to it
449
        # must be available at destpid.
450
        serialize(s, true)
4✔
451
        serialize(s, S.id.whence)
4✔
452
        serialize(s, S.id.id)
4✔
453
        return
4✔
454
    end
455
    serialize(s, false)
151✔
456
    for n in fieldnames(SharedArray)
151✔
457
        if n in [:s, :pidx, :loc_subarr_1d]
1,963✔
458
            writetag(s.io, UNDEFREF_TAG)
453✔
459
        elseif n === :refs
755✔
460
            v = getfield(S, n)
151✔
461
            if isa(v[1], Future)
151✔
462
                # convert to ids to avoid distributed GC overhead
463
                ids = [remoteref_id(x) for x in v]
142✔
464
                serialize(s, ids)
142✔
465
            else
466
                serialize(s, v)
151✔
467
            end
468
        else
469
            serialize(s, getfield(S, n))
604✔
470
        end
471
    end
1,208✔
472
end
473

474
function deserialize(s::AbstractSerializer, t::Type{<:SharedArray})
155✔
475
    ref_exists = deserialize(s)
155✔
476
    if ref_exists
155✔
477
        sref = sa_refs[RRID(deserialize(s), deserialize(s))]
4✔
478
        if sref.value !== nothing
4✔
479
            return sref.value
4✔
480
        end
481
        error("Expected reference to shared array instance not found")
×
482
    end
483

484
    S = invoke(deserialize, Tuple{AbstractSerializer,DataType}, s, t)
151✔
485
    init_loc_flds(S, true)
151✔
486
    return S
151✔
487
end
488

489
function show(io::IO, S::SharedArray)
2✔
490
    if length(S.s) > 0
2✔
491
        invoke(show, Tuple{IO,DenseArray}, io, S)
1✔
492
    else
493
        show(io, remotecall_fetch(sharr->sharr.s, S.pids[1], S))
2✔
494
    end
495
end
496

497
function show(io::IO, mime::MIME"text/plain", S::SharedArray)
1✔
498
    if length(S.s) > 0
1✔
499
        invoke(show, Tuple{IO,MIME"text/plain",DenseArray}, io, MIME"text/plain"(), S)
×
500
    else
501
        # retrieve from the first worker mapping the array.
502
        summary(io, S); println(io, ":")
2✔
503
        Base.print_array(io, remotecall_fetch(sharr->sharr.s, S.pids[1], S))
2✔
504
    end
505
end
506

507
Array(S::SharedArray) = S.s
1✔
508

509
# pass through getindex and setindex! - unlike DArrays, these always work on the complete array
510
Base.@propagate_inbounds getindex(S::SharedArray, i::Real) = getindex(S.s, i)
59,698✔
511

512
Base.@propagate_inbounds setindex!(S::SharedArray, x, i::Real) = setindex!(S.s, x, i)
8,113✔
513

514
function fill!(S::SharedArray, v)
5✔
515
    vT = convert(eltype(S), v)
5✔
516
    f = S->fill!(S.loc_subarr_1d, vT)
21✔
517
    @sync for p in procs(S)
10✔
518
        @async remotecall_wait(f, p, S)
16✔
519
    end
×
520
    return S
5✔
521
end
522

523
function Random.rand!(S::SharedArray{T}) where T
1✔
524
    f = S->map!(x -> rand(T), S.loc_subarr_1d, S.loc_subarr_1d)
8,005✔
525
    @sync for p in procs(S)
2✔
526
        @async remotecall_wait(f, p, S)
4✔
527
    end
×
528
    return S
1✔
529
end
530

531
function Random.randn!(S::SharedArray)
1✔
532
    f = S->map!(x -> randn(), S.loc_subarr_1d, S.loc_subarr_1d)
8,005✔
533
    @sync for p in procs(S)
2✔
534
        @async remotecall_wait(f, p, S)
4✔
535
    end
×
536
    return S
1✔
537
end
538

539
# convenience constructors
540
function shmem_fill(v, dims; kwargs...)
14✔
541
    SharedArray{typeof(v),length(dims)}(dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...)
31✔
542
end
543
shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...)
4✔
544

545
# rand variant with range
546
function shmem_rand(TR::Union{DataType, UnitRange}, dims; kwargs...)
16✔
547
    if isa(TR, UnitRange)
8✔
548
        SharedArray{Int,length(dims)}(dims; init = S -> map!(x -> rand(TR), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
8,005✔
549
    else
550
        SharedArray{TR,length(dims)}(dims; init = S -> map!(x -> rand(TR), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
56,035✔
551
    end
552
end
553
shmem_rand(TR::Union{DataType, UnitRange}, i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...)
×
554
shmem_rand(TR::Union{DataType, UnitRange}, I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...)
×
555

556
shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...)
14✔
557
shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...)
×
558

559
function shmem_randn(dims; kwargs...)
×
560
    SharedArray{Float64,length(dims)}(dims; init = S-> map!(x -> randn(), S.loc_subarr_1d, S.loc_subarr_1d), kwargs...)
×
561
end
562
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)
×
563

564
similar(S::SharedArray, T::Type, dims::Dims) = similar(S.s, T, dims)
8✔
565
similar(S::SharedArray, T::Type) = similar(S.s, T, size(S))
1✔
566
similar(S::SharedArray, dims::Dims) = similar(S.s, eltype(S), dims)
1✔
567
similar(S::SharedArray) = similar(S.s, eltype(S), size(S))
×
568

569
reduce(f, S::SharedArray) =
2✔
570
    mapreduce(fetch, f, Any[ @spawnat p reduce(f, S.loc_subarr_1d) for p in procs(S) ])
571

572
reduce(::typeof(vcat), S::SharedVector) = invoke(reduce, Tuple{Any,SharedArray}, vcat, S)
×
573
reduce(::typeof(hcat), S::SharedVector) = invoke(reduce, Tuple{Any,SharedArray}, hcat, S)
×
574

575
function map!(f, S::SharedArray, Q::SharedArray)
1✔
576
    if (S !== Q) && (procs(S) != procs(Q) || localindices(S) != localindices(Q))
1✔
577
        throw(ArgumentError("incompatible source and destination arguments"))
×
578
    end
579
    @sync for p in procs(S)
2✔
580
        @spawnat p begin
2✔
581
            for idx in localindices(S)
4✔
582
                S.s[idx] = f(Q.s[idx])
100✔
583
            end
198✔
584
        end
585
    end
×
586
    return S
1✔
587
end
588

589
copyto!(S::SharedArray, A::Array) = (copyto!(S.s, A); S)
20✔
590

591
function copyto!(S::SharedArray, R::SharedArray)
1✔
592
    length(S) == length(R) || throw(BoundsError())
1✔
593
    ps = intersect(procs(S), procs(R))
1✔
594
    isempty(ps) && throw(ArgumentError("source and destination arrays don't share any process"))
1✔
595
    l = length(S)
1✔
596
    length(ps) > l && (ps = ps[1:l])
1✔
597
    nw = length(ps)
1✔
598
    partlen = div(l, nw)
1✔
599

600
    @sync for i = 1:nw
4✔
601
        p = ps[i]
4✔
602
        idx = i < nw ?  ((i-1)*partlen+1:i*partlen) : ((i-1)*partlen+1:l)
4✔
603
        @spawnat p begin
4✔
604
            S.s[idx] = R.s[idx]
4✔
605
        end
606
    end
×
607

608
    return S
1✔
609
end
610

611
function print_shmem_limits(slen)
×
612
    try
×
613
        if Sys.islinux()
×
614
            pfx = "kernel"
×
615
        elseif Sys.isapple()
×
616
            pfx = "kern.sysv"
×
617
        elseif Sys.KERNEL === :FreeBSD || Sys.KERNEL === :DragonFly
×
618
            pfx = "kern.ipc"
×
619
        elseif Sys.KERNEL === :OpenBSD
×
620
            pfx = "kern.shminfo"
×
621
        else
622
            # seems NetBSD does not have *.shmall
623
            return
×
624
        end
625

626
        shmmax_MB = div(parse(Int, split(read(`sysctl $(pfx).shmmax`, String))[end]), 1024*1024)
×
627
        page_size = parse(Int, split(read(`getconf PAGE_SIZE`, String))[end])
×
628
        shmall_MB = div(parse(Int, split(read(`sysctl $(pfx).shmall`, String))[end]) * page_size, 1024*1024)
×
629

630
        println("System max size of single shmem segment(MB) : ", shmmax_MB,
×
631
            "\nSystem max size of all shmem segments(MB) : ", shmall_MB,
632
            "\nRequested size(MB) : ", div(slen, 1024*1024),
633
            "\nPlease ensure requested size is within system limits.",
634
            "\nIf not, increase system limits and try again."
635
        )
636
    catch e
637
        nothing # Ignore any errors in this
×
638
    end
639
end
640

641
# utilities
642
function shm_mmap_array(T, dims, shm_seg_name, mode)
196✔
643
    local s = nothing
196✔
644
    local A = nothing
196✔
645

646
    if (prod(dims) == 0) || (sizeof(T) == 0)
377✔
647
        return Array{T}(undef, dims)
15✔
648
    end
649

650
    try
181✔
651
        A = _shm_mmap_array(T, dims, shm_seg_name, mode)
181✔
652
    catch
653
        print_shmem_limits(prod(dims)*sizeof(T))
×
654
        rethrow()
181✔
655

656
    finally
657
        if s !== nothing
181✔
658
            close(s)
181✔
659
        end
660
    end
661
    A
181✔
662
end
663

664

665
# platform-specific code
666

667
if Sys.iswindows()
668
function _shm_mmap_array(T, dims, shm_seg_name, mode)
×
669
    readonly = !((mode & JL_O_RDWR) == JL_O_RDWR)
×
670
    create = (mode & JL_O_CREAT) == JL_O_CREAT
×
671
    s = Mmap.Anonymous(shm_seg_name, readonly, create)
×
672
    mmap(s, Array{T,length(dims)}, dims, zero(Int64))
×
673
end
674

675
# no-op in windows
676
shm_unlink(shm_seg_name) = 0
×
677

678
else # !windows
679
function _shm_mmap_array(T, dims, shm_seg_name, mode)
181✔
680
    fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR)
181✔
681
    systemerror("shm_open() failed for " * shm_seg_name, fd_mem < 0)
181✔
682

683
    s = fdio(fd_mem, true)
181✔
684

685
    # On OSX, ftruncate must to used to set size of segment, just lseek does not work.
686
    # and only at creation time
687
    if (mode & JL_O_CREAT) == JL_O_CREAT
181✔
688
        rc = ccall(:jl_ftruncate, Cint, (Cint, Int64), fd_mem, prod(dims)*sizeof(T))
38✔
689
        systemerror("ftruncate() failed for shm segment " * shm_seg_name, rc != 0)
38✔
690
    end
691

692
    mmap(s, Array{T,length(dims)}, dims, zero(Int64); grow=false)
181✔
693
end
694

695
shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Cstring,), shm_seg_name)
38✔
696
function shm_open(shm_seg_name, oflags, permissions)
181✔
697
    # On macOS, `shm_open()` is a variadic function, so to properly match
698
    # calling ABI, we must declare our arguments as variadic as well.
699
    @static if Sys.isapple()
×
700
        return ccall(:shm_open, Cint, (Cstring, Cint, Base.Cmode_t...), shm_seg_name, oflags, permissions)
701
    else
702
        return ccall(:shm_open, Cint, (Cstring, Cint, Base.Cmode_t), shm_seg_name, oflags, permissions)
181✔
703
    end
704
end
705
end # os-test
706

707
end # module
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc