• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37585

pending completion
#37585

push

local

web-flow
SROA: don't use `unswitchtupleunion` and explicitly use type name only (#50522)

Since construction of `UnionAll` of `Union`s can be expensive. The SROA pass just needs to
look at type name information and do not need to propagate full type objects.

- xref: <https://github.com/JuliaLang/julia/pull/50511#issuecomment-1632384357>
- closes #50511

17 of 17 new or added lines in 3 files covered. (100.0%)

73874 of 84465 relevant lines covered (87.46%)

33502527.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.0
/stdlib/Distributed/src/cluster.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
"""
4
    ClusterManager
5

6
Supertype for cluster managers, which control workers processes as a cluster.
7
Cluster managers implement how workers can be added, removed and communicated with.
8
`SSHManager` and `LocalManager` are subtypes of this.
9
"""
10
abstract type ClusterManager end
11

12
"""
13
    WorkerConfig
14

15
Type used by [`ClusterManager`](@ref)s to control workers added to their clusters. Some fields
16
are used by all cluster managers to access a host:
17
  * `io` -- the connection used to access the worker (a subtype of `IO` or `Nothing`)
18
  * `host` -- the host address (either a `String` or `Nothing`)
19
  * `port` -- the port on the host used to connect to the worker (either an `Int` or `Nothing`)
20

21
Some are used by the cluster manager to add workers to an already-initialized host:
22
  * `count` -- the number of workers to be launched on the host
23
  * `exename` -- the path to the Julia executable on the host, defaults to `"\$(Sys.BINDIR)/julia"` or
24
    `"\$(Sys.BINDIR)/julia-debug"`
25
  * `exeflags` -- flags to use when launching Julia remotely
26

27
The `userdata` field is used to store information for each worker by external managers.
28

29
Some fields are used by `SSHManager` and similar managers:
30
  * `tunnel` -- `true` (use tunneling), `false` (do not use tunneling), or [`nothing`](@ref) (use default for the manager)
31
  * `multiplex` -- `true` (use SSH multiplexing for tunneling) or `false`
32
  * `forward` -- the forwarding option used for `-L` option of ssh
33
  * `bind_addr` -- the address on the remote host to bind to
34
  * `sshflags` -- flags to use in establishing the SSH connection
35
  * `max_parallel` -- the maximum number of workers to connect to in parallel on the host
36

37
Some fields are used by both `LocalManager`s and `SSHManager`s:
38
  * `connect_at` -- determines whether this is a worker-to-worker or driver-to-worker setup call
39
  * `process` -- the process which will be connected (usually the manager will assign this during [`addprocs`](@ref))
40
  * `ospid` -- the process ID according to the host OS, used to interrupt worker processes
41
  * `environ` -- private dictionary used to store temporary information by Local/SSH managers
42
  * `ident` -- worker as identified by the [`ClusterManager`](@ref)
43
  * `connect_idents` -- list of worker ids the worker must connect to if using a custom topology
44
  * `enable_threaded_blas` -- `true`, `false`, or `nothing`, whether to use threaded BLAS or not on the workers
45
"""
46
mutable struct WorkerConfig
47
    # Common fields relevant to all cluster managers
48
    io::Union{IO, Nothing}
49
    host::Union{String, Nothing}
50
    port::Union{Int, Nothing}
51

52
    # Used when launching additional workers at a host
53
    count::Union{Int, Symbol, Nothing}
54
    exename::Union{String, Cmd, Nothing}
55
    exeflags::Union{Cmd, Nothing}
56

57
    # External cluster managers can use this to store information at a per-worker level
58
    # Can be a dict if multiple fields need to be stored.
59
    userdata::Any
60

61
    # SSHManager / SSH tunnel connections to workers
62
    tunnel::Union{Bool, Nothing}
63
    multiplex::Union{Bool, Nothing}
64
    forward::Union{String, Nothing}
65
    bind_addr::Union{String, Nothing}
66
    sshflags::Union{Cmd, Nothing}
67
    max_parallel::Union{Int, Nothing}
68

69
    # Used by Local/SSH managers
70
    connect_at::Any
71

72
    process::Union{Process, Nothing}
73
    ospid::Union{Int, Nothing}
74

75
    # Private dictionary used to store temporary information by Local/SSH managers.
76
    environ::Union{Dict, Nothing}
77

78
    # Connections to be setup depending on the network topology requested
79
    ident::Any      # Worker as identified by the Cluster Manager.
80
    # List of other worker idents this worker must connect with. Used with topology T_CUSTOM.
81
    connect_idents::Union{Array, Nothing}
82

83
    # Run multithreaded blas on worker
84
    enable_threaded_blas::Union{Bool, Nothing}
85

86
    function WorkerConfig()
707✔
87
        wc = new()
707✔
88
        for n in 1:fieldcount(WorkerConfig)
707✔
89
            setfield!(wc, n, nothing)
14,140✔
90
        end
27,573✔
91
        wc
707✔
92
    end
93
end
94

95
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
96
mutable struct Worker
97
    id::Int
98
    msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
99
    del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
100
    add_msgs::Array{Any,1}
101
    @atomic gcflag::Bool
102
    state::WorkerState
103
    c_state::Condition      # wait for state changes
104
    ct_time::Float64        # creation time
105
    conn_func::Any          # used to setup connections lazily
106

107
    r_stream::IO
108
    w_stream::IO
109
    w_serializer::ClusterSerializer  # writes can happen from any task hence store the
110
                                     # serializer as part of the Worker object
111
    manager::ClusterManager
112
    config::WorkerConfig
113
    version::Union{VersionNumber, Nothing}   # Julia version of the remote process
114
    initialized::Event
115

116
    function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
366✔
117
                             version::Union{VersionNumber, Nothing}=nothing,
118
                             config::WorkerConfig=WorkerConfig())
119
        w = Worker(id)
183✔
120
        w.r_stream = r_stream
366✔
121
        w.w_stream = buffer_writes(w_stream)
366✔
122
        w.w_serializer = ClusterSerializer(w.w_stream)
183✔
123
        w.manager = manager
366✔
124
        w.config = config
366✔
125
        w.version = version
366✔
126
        set_worker_state(w, W_CONNECTED)
366✔
127
        register_worker_streams(w)
366✔
128
        w
183✔
129
    end
130

131
    Worker(id::Int) = Worker(id, nothing)
758✔
132
    function Worker(id::Int, conn_func)
1,287✔
133
        @assert id > 0
1,287✔
134
        if haskey(map_pid_wrkr, id)
1,287✔
135
            return map_pid_wrkr[id]
95✔
136
        end
137
        w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
1,192✔
138
        w.initialized = Event()
1,192✔
139
        register_worker(w)
1,192✔
140
        w
1,192✔
141
    end
142

143
    Worker() = Worker(get_next_pid())
81✔
144
end
145

146
function set_worker_state(w, state)
307✔
147
    w.state = state
307✔
148
    notify(w.c_state; all=true)
307✔
149
end
150

151
function check_worker_state(w::Worker)
31,125✔
152
    if w.state === W_CREATED
31,125✔
153
        if !isclusterlazy()
18✔
154
            if PGRP.topology === :all_to_all
×
155
                # Since higher pids connect with lower pids, the remote worker
156
                # may not have connected to us yet. Wait for some time.
157
                wait_for_conn(w)
×
158
            else
159
                error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
×
160
            end
161
        else
162
            w.ct_time = time()
9✔
163
            if myid() > w.id
9✔
164
                t = @async exec_conn_func(w)
12✔
165
            else
166
                # route request via node 1
167
                t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
9✔
168
            end
169
            errormonitor(t)
9✔
170
            wait_for_conn(w)
31,125✔
171
        end
172
    end
173
end
174

175
exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker)
3✔
176
function exec_conn_func(w::Worker)
9✔
177
    try
9✔
178
        f = notnothing(w.conn_func)
18✔
179
        # Will be called if some other task tries to connect at the same time.
180
        w.conn_func = () -> wait_for_conn(w)
9✔
181
        f()
9✔
182
    catch e
183
        w.conn_func = () -> throw(e)
×
184
        rethrow()
×
185
    end
186
    nothing
9✔
187
end
188

189
function wait_for_conn(w)
9✔
190
    if w.state === W_CREATED
9✔
191
        timeout =  worker_timeout() - (time() - w.ct_time)
9✔
192
        timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
9✔
193

194
        @async (sleep(timeout); notify(w.c_state; all=true))
21✔
195
        wait(w.c_state)
9✔
196
        w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
9✔
197
    end
198
    nothing
9✔
199
end
200

201
## process group creation ##
202

203
mutable struct LocalProcess
204
    id::Int
205
    bind_addr::String
206
    bind_port::UInt16
207
    cookie::String
208
    LocalProcess() = new(1)
3✔
209
end
210

211
worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0"))
248✔
212

213

214
## worker creation and setup ##
215
"""
216
    start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
217

218
`start_worker` is an internal function which is the default entry point for
219
worker processes connecting via TCP/IP. It sets up the process as a Julia cluster
220
worker.
221

222
host:port information is written to stream `out` (defaults to stdout).
223

224
The function reads the cookie from stdin if required, and  listens on a free port
225
(or if specified, the port in the `--bind-to` command line option) and schedules
226
tasks to process incoming TCP connections and requests. It also (optionally)
227
closes stdin and redirects stderr to stdout.
228

229
It does not return.
230
"""
231
start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...)
227✔
232
function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
156✔
233
    init_multi()
78✔
234

235
    if close_stdin # workers will not use it
78✔
236
        redirect_stdin(devnull)
76✔
237
        close(stdin)
76✔
238
    end
239
    stderr_to_stdout && redirect_stderr(stdout)
78✔
240

241
    init_worker(cookie)
78✔
242
    interface = IPv4(LPROC.bind_addr)
78✔
243
    if LPROC.bind_port == 0
78✔
244
        port_hint = 9000 + (getpid() % 1000)
78✔
245
        (port, sock) = listenany(interface, UInt16(port_hint))
78✔
246
        LPROC.bind_port = port
78✔
247
    else
248
        sock = listen(interface, LPROC.bind_port)
×
249
    end
250
    errormonitor(@async while isopen(sock)
156✔
251
        client = accept(sock)
338✔
252
        process_messages(client, client, true)
182✔
253
    end)
91✔
254
    print(out, "julia_worker:")  # print header
78✔
255
    print(out, "$(string(LPROC.bind_port))#") # print port
77✔
256
    print(out, LPROC.bind_addr)
77✔
257
    print(out, '\n')
77✔
258
    flush(out)
77✔
259

260
    Sockets.nagle(sock, false)
77✔
261
    Sockets.quickack(sock, true)
77✔
262

263
    if ccall(:jl_running_on_valgrind,Cint,()) != 0
77✔
264
        println(out, "PID = $(getpid())")
×
265
    end
266

267
    try
77✔
268
        # To prevent hanging processes on remote machines, newly launched workers exit if the
269
        # master process does not connect in time.
270
        check_master_connect()
154✔
271
        while true; wait(); end
154✔
272
    catch err
273
        print(stderr, "unhandled exception on $(myid()): $(err)\nexiting.\n")
×
274
    end
275

276
    close(sock)
×
277
    exit(0)
×
278
end
279

280

281
function redirect_worker_output(ident, stream)
77✔
282
    t = @async while !eof(stream)
154✔
283
        line = readline(stream)
3,909✔
284
        if startswith(line, "      From worker ")
7,304✔
285
            # stdout's of "additional" workers started from an initial worker on a host are not available
286
            # on the master directly - they are routed via the initial worker's stdout.
287
            println(line)
×
288
        else
289
            println("      From worker $(ident):\t$line")
3,909✔
290
        end
291
    end
3,971✔
292
    errormonitor(t)
77✔
293
end
294

295
struct LaunchWorkerError <: Exception
296
    msg::String
4✔
297
end
298

299
Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg)
2✔
300

301
# The default TCP transport relies on the worker listening on a free
302
# port available and printing its bind address and port.
303
# The master process uses this to connect to the worker and subsequently
304
# setup a all-to-all network.
305
function read_worker_host_port(io::IO)
81✔
306
    t0 = time_ns()
81✔
307

308
    # Wait at most for JULIA_WORKER_TIMEOUT seconds to read host:port
309
    # info from the worker
310
    timeout = worker_timeout() * 1e9
81✔
311
    # We expect the first line to contain the host:port string. However, as
312
    # the worker may be launched via ssh or a cluster manager like SLURM,
313
    # ignore any informational / warning lines printed by the launch command.
314
    # If we do not find the host:port string in the first 1000 lines, treat it
315
    # as an error.
316

317
    ntries = 1000
81✔
318
    leader = String[]
81✔
319
    try
81✔
320
        while ntries > 0
1,081✔
321
            readtask = @async readline(io)
2,160✔
322
            yield()
1,080✔
323
            while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
125,197✔
324
                sleep(0.05)
124,117✔
325
            end
124,117✔
326
            !istaskdone(readtask) && break
1,080✔
327

328
            conninfo = fetch(readtask)
1,078✔
329
            if isempty(conninfo) && !isopen(io)
1,078✔
330
                throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?"))
1✔
331
            end
332

333
            ntries -= 1
1,077✔
334
            bind_addr, port = parse_connection_info(conninfo)
1,077✔
335
            if !isempty(bind_addr)
1,077✔
336
                return bind_addr, port
77✔
337
            end
338

339
            # collect unmatched lines
340
            push!(leader, conninfo)
1,000✔
341
        end
1,000✔
342
        close(io)
3✔
343
        if ntries > 0
3✔
344
            throw(LaunchWorkerError("Timed out waiting to read host:port string from worker."))
2✔
345
        else
346
            throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found."))
5✔
347
        end
348
    finally
349
        for line in leader
161✔
350
            println("\tFrom worker startup:\t", line)
1,000✔
351
        end
1,000✔
352
    end
353
end
354

355
function parse_connection_info(str)
1,077✔
356
    m = match(r"^julia_worker:(\d+)#(.*)", str)
1,077✔
357
    if m !== nothing
1,077✔
358
        (String(m.captures[2]), parse(UInt16, m.captures[1]))
77✔
359
    else
360
        ("", UInt16(0))
1,000✔
361
    end
362
end
363

364
"""
365
    init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
366

367
Called by cluster managers implementing custom transports. It initializes a newly launched
368
process as a worker. Command line argument `--worker[=<cookie>]` has the effect of initializing a
369
process as a worker using TCP/IP sockets for transport.
370
`cookie` is a [`cluster_cookie`](@ref).
371
"""
372
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
156✔
373
    myrole!(:worker)
156✔
374

375
    # On workers, the default cluster manager connects via TCP sockets. Custom
376
    # transports will need to call this function with their own manager.
377
    global cluster_manager
78✔
378
    cluster_manager = manager
78✔
379

380
    # Since our pid has yet to be set, ensure no RemoteChannel / Future  have been created or addprocs() called.
381
    @assert nprocs() <= 1
78✔
382
    @assert isempty(PGRP.refs)
78✔
383
    @assert isempty(client_refs)
78✔
384

385
    # System is started in head node mode, cleanup related entries
386
    empty!(PGRP.workers)
78✔
387
    empty!(map_pid_wrkr)
78✔
388

389
    cluster_cookie(cookie)
78✔
390
    nothing
78✔
391
end
392

393

394
# The main function for adding worker processes.
395
# `manager` is of type ClusterManager. The respective managers are responsible
396
# for launching the workers. All keyword arguments (plus a few default values)
397
# are available as a dictionary to the `launch` methods
398
#
399
# Only one addprocs can be in progress at any time
400
#
401
const worker_lock = ReentrantLock()
402

403
"""
404
    addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
405

406
Launches worker processes via the specified cluster manager.
407

408
For example, Beowulf clusters are supported via a custom cluster manager implemented in
409
the package `ClusterManagers.jl`.
410

411
The number of seconds a newly launched worker waits for connection establishment from the
412
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
413
environment. Relevant only when using TCP/IP as transport.
414

415
To launch workers without blocking the REPL, or the containing function
416
if launching workers programmatically, execute `addprocs` in its own task.
417

418
# Examples
419

420
```julia
421
# On busy clusters, call `addprocs` asynchronously
422
t = @async addprocs(...)
423
```
424

425
```julia
426
# Utilize workers as and when they come online
427
if nprocs() > 1   # Ensure at least one new worker is available
428
   ....   # perform distributed execution
429
end
430
```
431

432
```julia
433
# Retrieve newly launched worker IDs, or any error messages
434
if istaskdone(t)   # Check if `addprocs` has completed to ensure `fetch` doesn't block
435
    if nworkers() == N
436
        new_pids = fetch(t)
437
    else
438
        fetch(t)
439
    end
440
end
441
```
442
"""
443
function addprocs(manager::ClusterManager; kwargs...)
110✔
444
    init_multi()
55✔
445

446
    cluster_mgmt_from_master_check()
55✔
447

448
    lock(worker_lock)
54✔
449
    try
54✔
450
        addprocs_locked(manager::ClusterManager; kwargs...)
58✔
451
    finally
452
        unlock(worker_lock)
54✔
453
    end
454
end
455

456
function addprocs_locked(manager::ClusterManager; kwargs...)
108✔
457
    params = merge(default_addprocs_params(manager), Dict{Symbol,Any}(kwargs))
54✔
458
    topology(Symbol(params[:topology]))
54✔
459

460
    if PGRP.topology !== :all_to_all
54✔
461
        params[:lazy] = false
×
462
    end
463

464
    if PGRP.lazy === nothing || nprocs() == 1
98✔
465
        PGRP.lazy = params[:lazy]
21✔
466
    elseif isclusterlazy() != params[:lazy]
66✔
467
        throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(),
×
468
                                    ". Cannot set lazy=", params[:lazy])))
469
    end
470

471
    # References to launched workers, filled when each worker is fully initialized and
472
    # has connected to all nodes.
473
    launched_q = Int[]   # Asynchronously filled by the launch method
54✔
474

475
    # The `launch` method should add an object of type WorkerConfig for every
476
    # worker launched. It provides information required on how to connect
477
    # to it.
478

479
    # FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
480
    # but both are part of the public interface. This means we currently can't use
481
    # `Threads.@spawn` in the code below.
482
    launched = WorkerConfig[]
54✔
483
    launch_ntfy = Condition()
54✔
484

485
    # call manager's `launch` is a separate task. This allows the master
486
    # process initiate the connection setup process as and when workers come
487
    # online
488
    t_launch = @async launch(manager, params, launched, launch_ntfy)
108✔
489

490
    @sync begin
108✔
491
        while true
136✔
492
            if isempty(launched)
136✔
493
                istaskdone(t_launch) && break
108✔
494
                @async (sleep(1); notify(launch_ntfy))
162✔
495
                wait(launch_ntfy)
54✔
496
            end
497

498
            if !isempty(launched)
82✔
499
                wconfig = popfirst!(launched)
81✔
500
                let wconfig=wconfig
81✔
501
                    @async setup_launched_worker(manager, wconfig, launched_q)
162✔
502
                end
503
            end
504
        end
82✔
505
    end
506

507
    Base.wait(t_launch)      # catches any thrown errors from the launch task
50✔
508

509
    # Since all worker-to-worker setups may not have completed by the time this
510
    # function returns to the caller, send the complete list to all workers.
511
    # Useful for nprocs(), nworkers(), etc to return valid values on the workers.
512
    all_w = workers()
99✔
513
    for pid in all_w
50✔
514
        remote_do(set_valid_processes, pid, all_w)
929✔
515
    end
515✔
516

517
    sort!(launched_q)
50✔
518
end
519

520
function set_valid_processes(plist::Array{Int})
440✔
521
    for pid in setdiff(plist, workers())
493✔
522
        myid() != pid && Worker(pid)
494✔
523
    end
494✔
524
end
525

526
"""
527
    default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
528

529
Implemented by cluster managers. The default keyword parameters passed when calling
530
`addprocs(mgr)`. The minimal set of options is available by calling
531
`default_addprocs_params()`
532
"""
533
default_addprocs_params(::ClusterManager) = default_addprocs_params()
97✔
534
default_addprocs_params() = Dict{Symbol,Any}(
99✔
535
    :topology => :all_to_all,
536
    :dir      => pwd(),
537
    :exename  => joinpath(Sys.BINDIR, julia_exename()),
538
    :exeflags => ``,
539
    :env      => [],
540
    :enable_threaded_blas => false,
541
    :lazy => true)
542

543

544
function setup_launched_worker(manager, wconfig, launched_q)
81✔
545
    pid = create_worker(manager, wconfig)
81✔
546
    push!(launched_q, pid)
77✔
547

548
    # When starting workers on remote multi-core hosts, `launch` can (optionally) start only one
549
    # process on the remote machine, with a request to start additional workers of the
550
    # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`.
551
    cnt = something(wconfig.count, 1)
77✔
552
    if cnt === :auto
77✔
553
        cnt = wconfig.environ[:cpu_threads]
×
554
    end
555
    cnt = cnt - 1   # Removing self from the requested number
77✔
556

557
    if cnt > 0
77✔
558
        launch_n_additional_processes(manager, pid, wconfig, cnt, launched_q)
×
559
    end
560
end
561

562

563
function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launched_q)
×
564
    @sync begin
×
565
        exename = notnothing(fromconfig.exename)
×
566
        exeflags = something(fromconfig.exeflags, ``)
×
567
        cmd = `$exename $exeflags`
×
568

569
        new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd)
×
570
        for address in new_addresses
×
571
            (bind_addr, port) = address
×
572

573
            wconfig = WorkerConfig()
×
574
            for x in [:host, :tunnel, :multiplex, :sshflags, :exeflags, :exename, :enable_threaded_blas]
×
575
                Base.setproperty!(wconfig, x, Base.getproperty(fromconfig, x))
×
576
            end
×
577
            wconfig.bind_addr = bind_addr
×
578
            wconfig.port = port
×
579

580
            let wconfig=wconfig
×
581
                @async begin
×
582
                    pid = create_worker(manager, wconfig)
×
583
                    remote_do(redirect_output_from_additional_worker, frompid, pid, port)
×
584
                    push!(launched_q, pid)
×
585
                end
586
            end
587
        end
×
588
    end
589
end
590

591
function create_worker(manager, wconfig)
81✔
592
    # only node 1 can add new nodes, since nobody else has the full list of address:port
593
    @assert LPROC.id == 1
81✔
594
    timeout = worker_timeout()
81✔
595

596
    # initiate a connect. Does not wait for connection completion in case of TCP.
597
    w = Worker()
81✔
598
    local r_s, w_s
×
599
    try
81✔
600
        (r_s, w_s) = connect(manager, w.id, wconfig)
81✔
601
    catch ex
602
        try
4✔
603
            deregister_worker(w.id)
4✔
604
            kill(manager, w.id, wconfig)
8✔
605
        finally
606
            rethrow(ex)
4✔
607
        end
608
    end
609

610
    w = Worker(w.id, r_s, w_s, manager; config=wconfig)
77✔
611
    # install a finalizer to perform cleanup if necessary
612
    finalizer(w) do w
77✔
613
        if myid() == 1
55✔
614
            manage(w.manager, w.id, w.config, :finalize)
55✔
615
        end
616
    end
617

618
    # set when the new worker has finished connections with all other workers
619
    ntfy_oid = RRID()
77✔
620
    rr_ntfy_join = lookup_ref(ntfy_oid)
77✔
621
    rr_ntfy_join.waitingfor = myid()
77✔
622

623
    # Start a new task to handle inbound messages from connected worker in master.
624
    # Also calls `wait_connected` on TCP streams.
625
    process_messages(w.r_stream, w.w_stream, false)
77✔
626

627
    # send address information of all workers to the new worker.
628
    # Cluster managers set the address of each worker in `WorkerConfig.connect_at`.
629
    # A new worker uses this to setup an all-to-all network if topology :all_to_all is specified.
630
    # Workers with higher pids connect to workers with lower pids. Except process 1 (master) which
631
    # initiates connections to all workers.
632

633
    # Connection Setup Protocol:
634
    # - Master sends 16-byte cookie followed by 16-byte version string and a JoinPGRP message to all workers
635
    # - On each worker
636
    #   - Worker responds with a 16-byte version followed by a JoinCompleteMsg
637
    #   - Connects to all workers less than its pid. Sends the cookie, version and an IdentifySocket message
638
    #   - Workers with incoming connection requests write back their Version and an IdentifySocketAckMsg message
639
    # - On master, receiving a JoinCompleteMsg triggers rr_ntfy_join (signifies that worker setup is complete)
640

641
    join_list = []
77✔
642
    if PGRP.topology === :all_to_all
77✔
643
        # need to wait for lower worker pids to have completed connecting, since the numerical value
644
        # of pids is relevant to the connection process, i.e., higher pids connect to lower pids and they
645
        # require the value of config.connect_at which is set only upon connection completion
646
        for jw in PGRP.workers
77✔
647
            if (jw.id != 1) && (jw.id < w.id)
833✔
648
                (jw.state === W_CREATED) && wait(jw.c_state)
537✔
649
                push!(join_list, jw)
537✔
650
            end
651
        end
910✔
652

653
    elseif PGRP.topology === :custom
×
654
        # wait for requested workers to be up before connecting to them.
655
        filterfunc(x) = (x.id != 1) && isdefined(x, :config) &&
×
656
            (notnothing(x.config.ident) in something(wconfig.connect_idents, []))
657

658
        wlist = filter(filterfunc, PGRP.workers)
×
659
        waittime = 0
×
660
        while wconfig.connect_idents !== nothing &&
×
661
              length(wlist) < length(wconfig.connect_idents)
662
            if waittime >= timeout
×
663
                error("peer workers did not connect within $timeout seconds")
×
664
            end
665
            sleep(1.0)
×
666
            waittime += 1
×
667
            wlist = filter(filterfunc, PGRP.workers)
×
668
        end
×
669

670
        for wl in wlist
×
671
            (wl.state === W_CREATED) && wait(wl.c_state)
×
672
            push!(join_list, wl)
×
673
        end
×
674
    end
675

676
    all_locs = mapany(x -> isa(x, Worker) ?
614✔
677
                      (something(x.config.connect_at, ()), x.id) :
678
                      ((), x.id, true),
679
                      join_list)
680
    send_connection_hdr(w, true)
77✔
681
    enable_threaded_blas = something(wconfig.enable_threaded_blas, false)
147✔
682
    join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
77✔
683
    send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)
77✔
684

685
    @async manage(w.manager, w.id, w.config, :register)
154✔
686
    # wait for rr_ntfy_join with timeout
687
    timedout = false
77✔
688
    @async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1))
274✔
689
    wait(rr_ntfy_join)
77✔
690
    if timedout
77✔
691
        error("worker did not connect within $timeout seconds")
×
692
    end
693
    lock(client_refs) do
77✔
694
        delete!(PGRP.refs, ntfy_oid)
77✔
695
    end
696

697
    return w.id
77✔
698
end
699

700

701
# Called on the first worker on a remote host. Used to optimize launching
702
# of multiple workers on a remote host (to leverage multi-core)
703

704
additional_io_objs=Dict()
705
function launch_additional(np::Integer, cmd::Cmd)
×
706
    io_objs = Vector{Any}(undef, np)
×
707
    addresses = Vector{Any}(undef, np)
×
708

709
    for i in 1:np
×
710
        io = open(detach(cmd), "r+")
×
711
        write_cookie(io)
×
712
        io_objs[i] = io.out
×
713
    end
×
714

715
    for (i,io) in enumerate(io_objs)
×
716
        (host, port) = read_worker_host_port(io)
×
717
        addresses[i] = (host, port)
×
718
        additional_io_objs[port] = io
×
719
    end
×
720

721
    return addresses
×
722
end
723

724
function redirect_output_from_additional_worker(pid, port)
×
725
    io = additional_io_objs[port]
×
726
    redirect_worker_output("$pid", io)
×
727
    delete!(additional_io_objs, port)
×
728
    nothing
×
729
end
730

731
function check_master_connect()
77✔
732
    timeout = worker_timeout() * 1e9
77✔
733
    # If we do not have at least process 1 connect to us within timeout
734
    # we log an error and exit, unless we're running on valgrind
735
    if ccall(:jl_running_on_valgrind,Cint,()) != 0
77✔
736
        return
×
737
    end
738
    @async begin
154✔
739
        start = time_ns()
77✔
740
        while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout
1,444✔
741
            sleep(1.0)
1,367✔
742
        end
1,367✔
743

744
        if !haskey(map_pid_wrkr, 1)
77✔
745
            print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n")
1✔
746
            exit(1)
1✔
747
        end
748
    end
749
end
750

751

752
"""
753
    cluster_cookie() -> cookie
754

755
Return the cluster cookie.
756
"""
757
cluster_cookie() = (init_multi(); LPROC.cookie)
328✔
758

759
"""
760
    cluster_cookie(cookie) -> cookie
761

762
Set the passed cookie as the cluster cookie, then returns it.
763
"""
764
function cluster_cookie(cookie)
169✔
765
    init_multi()
169✔
766
    # The cookie must be an ASCII string with length <=  HDR_COOKIE_LEN
767
    @assert isascii(cookie)
169✔
768
    @assert length(cookie) <= HDR_COOKIE_LEN
169✔
769

770
    cookie = rpad(cookie, HDR_COOKIE_LEN)
169✔
771

772
    LPROC.cookie = cookie
169✔
773
    cookie
169✔
774
end
775

776

777
let next_pid = 2    # 1 is reserved for the client (always)
778
    global get_next_pid
779
    function get_next_pid()
81✔
780
        retval = next_pid
81✔
781
        next_pid += 1
81✔
782
        retval
81✔
783
    end
784
end
785

786
mutable struct ProcessGroup
787
    name::String
788
    workers::Array{Any,1}
789
    refs::Dict{RRID,Any}                  # global references
790
    topology::Symbol
791
    lazy::Union{Bool, Nothing}
792

793
    ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, nothing)
3✔
794
end
795
const PGRP = ProcessGroup([])
796

797
function topology(t)
130✔
798
    @assert t in [:all_to_all, :master_worker, :custom]
130✔
799
    if (PGRP.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1)
130✔
800
        PGRP.topology = t
130✔
801
    else
802
        error("Workers with Topology $(PGRP.topology) already exist. Requested Topology $(t) cannot be set.")
×
803
    end
804
    t
130✔
805
end
806

807
isclusterlazy() = something(PGRP.lazy, false)
1,686✔
808

809
get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
×
810
get_bind_addr(w::LocalProcess) = LPROC.bind_addr
×
811
function get_bind_addr(w::Worker)
×
812
    if w.config.bind_addr === nothing
×
813
        if w.id != myid()
×
814
            w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id)
×
815
        end
816
    end
817
    w.config.bind_addr
×
818
end
819

820
# globals
821
const LPROC = LocalProcess()
822
const LPROCROLE = Ref{Symbol}(:master)
823
const HDR_VERSION_LEN=16
824
const HDR_COOKIE_LEN=16
825
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
826
const map_sock_wrkr = IdDict()
827
const map_del_wrkr = Set{Int}()
828

829
# whether process is a master or worker in a distributed setup
830
myrole() = LPROCROLE[]
71✔
831
function myrole!(proctype::Symbol)
78✔
832
    LPROCROLE[] = proctype
78✔
833
end
834

835
# cluster management related API
836
"""
837
    myid()
838

839
Get the id of the current process.
840

841
# Examples
842
```julia-repl
843
julia> myid()
844
1
845

846
julia> remotecall_fetch(() -> myid(), 4)
847
4
848
```
849
"""
850
myid() = LPROC.id
700,703✔
851

852
"""
853
    nprocs()
854

855
Get the number of available processes.
856

857
# Examples
858
```julia-repl
859
julia> nprocs()
860
3
861

862
julia> workers()
863
2-element Array{Int64,1}:
864
 2
865
 3
866
```
867
"""
868
function nprocs()
452✔
869
    if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
460✔
870
        n = length(PGRP.workers)
452✔
871
        # filter out workers in the process of being setup/shutdown.
872
        for jw in PGRP.workers
452✔
873
            if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
4,638✔
874
                n = n - 1
×
875
            end
876
        end
5,090✔
877
        return n
452✔
878
    else
879
        return length(PGRP.workers)
×
880
    end
881
end
882

883
"""
884
    nworkers()
885

886
Get the number of available worker processes. This is one less than [`nprocs()`](@ref). Equal to
887
`nprocs()` if `nprocs() == 1`.
888

889
# Examples
890
```julia-repl
891
\$ julia -p 2
892

893
julia> nprocs()
894
3
895

896
julia> nworkers()
897
2
898
```
899
"""
900
function nworkers()
269✔
901
    n = nprocs()
269✔
902
    n == 1 ? 1 : n-1
527✔
903
end
904

905
"""
906
    procs()
907

908
Return a list of all process identifiers, including pid 1 (which is not included by [`workers()`](@ref)).
909

910
# Examples
911
```julia-repl
912
\$ julia -p 2
913

914
julia> procs()
915
3-element Array{Int64,1}:
916
 1
917
 2
918
 3
919
```
920
"""
921
function procs()
696✔
922
    if myid() == 1 || (PGRP.topology === :all_to_all  && !isclusterlazy())
1,576✔
923
        # filter out workers in the process of being setup/shutdown.
924
        return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
260✔
925
    else
926
        return Int[x.id for x in PGRP.workers]
436✔
927
    end
928
end
929

930
function id_in_procs(id)  # faster version of `id in procs()`
3,460✔
931
    if myid() == 1 || (PGRP.topology === :all_to_all  && !isclusterlazy())
3,896✔
932
        for x in PGRP.workers
3,242✔
933
            if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
15,195✔
934
                return true
3,177✔
935
            end
936
        end
8,906✔
937
    else
938
        for x in PGRP.workers
218✔
939
            if (x.id::Int) == id
229✔
940
                return true
218✔
941
            end
942
        end
11✔
943
    end
944
    return false
65✔
945
end
946

947
"""
948
    procs(pid::Integer)
949

950
Return a list of all process identifiers on the same physical node.
951
Specifically all workers bound to the same ip-address as `pid` are returned.
952
"""
953
function procs(pid::Integer)
46✔
954
    if myid() == 1
46✔
955
        all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
46✔
956
        if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
48✔
957
            Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
276✔
958
        else
959
            ipatpid = get_bind_addr(pid)
×
960
            Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, all_workers)]
×
961
        end
962
    else
963
        remotecall_fetch(procs, 1, pid)
×
964
    end
965
end
966

967
"""
968
    workers()
969

970
Return a list of all worker process identifiers.
971

972
# Examples
973
```julia-repl
974
\$ julia -p 2
975

976
julia> workers()
977
2-element Array{Int64,1}:
978
 2
979
 3
980
```
981
"""
982
function workers()
639✔
983
    allp = procs()
639✔
984
    if length(allp) == 1
639✔
985
       allp
19✔
986
    else
987
       filter(x -> x != 1, allp)
8,341✔
988
    end
989
end
990

991
function cluster_mgmt_from_master_check()
112✔
992
    if myid() != 1
112✔
993
        throw(ErrorException("Only process 1 can add and remove workers"))
2✔
994
    end
995
end
996

997
"""
998
    rmprocs(pids...; waitfor=typemax(Int))
999

1000
Remove the specified workers. Note that only process 1 can add or remove
1001
workers.
1002

1003
Argument `waitfor` specifies how long to wait for the workers to shut down:
1004
  - If unspecified, `rmprocs` will wait until all requested `pids` are removed.
1005
  - An [`ErrorException`](@ref) is raised if all workers cannot be terminated before
1006
    the requested `waitfor` seconds.
1007
  - With a `waitfor` value of 0, the call returns immediately with the workers
1008
    scheduled for removal in a different task. The scheduled [`Task`](@ref) object is
1009
    returned. The user should call [`wait`](@ref) on the task before invoking any other
1010
    parallel calls.
1011

1012
# Examples
1013
```julia-repl
1014
\$ julia -p 5
1015

1016
julia> t = rmprocs(2, 3, waitfor=0)
1017
Task (runnable) @0x0000000107c718d0
1018

1019
julia> wait(t)
1020

1021
julia> workers()
1022
3-element Array{Int64,1}:
1023
 4
1024
 5
1025
 6
1026
```
1027
"""
1028
function rmprocs(pids...; waitfor=typemax(Int))
114✔
1029
    cluster_mgmt_from_master_check()
57✔
1030

1031
    pids = vcat(pids...)
56✔
1032
    if waitfor == 0
56✔
1033
        t = @async _rmprocs(pids, typemax(Int))
×
1034
        yield()
×
1035
        return t
×
1036
    else
1037
        _rmprocs(pids, waitfor)
56✔
1038
        # return a dummy task object that user code can wait on.
1039
        return @async nothing
112✔
1040
    end
1041
end
1042

1043
function _rmprocs(pids, waitfor)
56✔
1044
    lock(worker_lock)
91✔
1045
    try
56✔
1046
        rmprocset = Union{LocalProcess, Worker}[]
56✔
1047
        for p in pids
56✔
1048
            if p == 1
62✔
1049
                @warn "rmprocs: process 1 not removed"
×
1050
            else
1051
                if haskey(map_pid_wrkr, p)
62✔
1052
                    w = map_pid_wrkr[p]
62✔
1053
                    set_worker_state(w, W_TERMINATING)
124✔
1054
                    kill(w.manager, p, w.config)
69✔
1055
                    push!(rmprocset, w)
62✔
1056
                end
1057
            end
1058
        end
118✔
1059

1060
        start = time_ns()
56✔
1061
        while (time_ns() - start) < waitfor*1e9
342✔
1062
            all(w -> w.state === W_TERMINATED, rmprocset) && break
746✔
1063
            sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
286✔
1064
        end
286✔
1065

1066
        unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
118✔
1067
        if length(unremoved) > 0
56✔
1068
            estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
×
1069
            throw(ErrorException(estr))
×
1070
        end
1071
    finally
1072
        unlock(worker_lock)
56✔
1073
    end
1074
end
1075

1076

1077
"""
1078
    ProcessExitedException(worker_id::Int)
1079

1080
After a client Julia process has exited, further attempts to reference the dead child will
1081
throw this exception.
1082
"""
1083
struct ProcessExitedException <: Exception
1084
    worker_id::Int
4✔
1085
end
1086

1087
# No-arg constructor added for compatibility with Julia 1.0 & 1.1, should be deprecated in the future
1088
ProcessExitedException() = ProcessExitedException(-1)
×
1089

1090
worker_from_id(i) = worker_from_id(PGRP, i)
131,633✔
1091
function worker_from_id(pg::ProcessGroup, i)
131,633✔
1092
    if !isempty(map_del_wrkr) && in(i, map_del_wrkr)
131,633✔
1093
        throw(ProcessExitedException(i))
4✔
1094
    end
1095
    w = get(map_pid_wrkr, i, nothing)
263,250✔
1096
    if w === nothing
131,629✔
1097
        if myid() == 1
8✔
1098
            error("no process with id $i exists")
8✔
1099
        end
1100
        w = Worker(i)
×
1101
        map_pid_wrkr[i] = w
×
1102
    else
1103
        w = w::Union{Worker, LocalProcess}
×
1104
    end
1105
    w
131,621✔
1106
end
1107

1108
"""
1109
    worker_id_from_socket(s) -> pid
1110

1111
A low-level API which, given a `IO` connection or a `Worker`,
1112
returns the `pid` of the worker it is connected to.
1113
This is useful when writing custom [`serialize`](@ref) methods for a type,
1114
which optimizes the data written out depending on the receiving process id.
1115
"""
1116
function worker_id_from_socket(s)
16,082✔
1117
    w = get(map_sock_wrkr, s, nothing)
31,890✔
1118
    if isa(w,Worker)
16,082✔
1119
        if s === w.r_stream || s === w.w_stream
15,808✔
1120
            return w.id
15,808✔
1121
        end
1122
    end
1123
    if isa(s,IOStream) && fd(s)==-1
274✔
1124
        # serializing to a local buffer
1125
        return myid()
×
1126
    end
1127
    return -1
274✔
1128
end
1129

1130

1131
register_worker(w) = register_worker(PGRP, w)
1,360✔
1132
function register_worker(pg, w)
×
1133
    push!(pg.workers, w)
1,360✔
1134
    map_pid_wrkr[w.id] = w
1,360✔
1135
end
1136

1137
function register_worker_streams(w)
183✔
1138
    map_sock_wrkr[w.r_stream] = w
183✔
1139
    map_sock_wrkr[w.w_stream] = w
183✔
1140
end
1141

1142
deregister_worker(pid) = deregister_worker(PGRP, pid)
572✔
1143
function deregister_worker(pg, pid)
572✔
1144
    pg.workers = filter(x -> !(x.id == pid), pg.workers)
8,701✔
1145
    w = pop!(map_pid_wrkr, pid, nothing)
587✔
1146
    if isa(w, Worker)
572✔
1147
        if isdefined(w, :r_stream)
557✔
1148
            pop!(map_sock_wrkr, w.r_stream, nothing)
124✔
1149
            if w.r_stream != w.w_stream
62✔
1150
                pop!(map_sock_wrkr, w.w_stream, nothing)
×
1151
            end
1152
        end
1153

1154
        if myid() == 1 && (myrole() === :master) && isdefined(w, :config)
557✔
1155
            # Notify the cluster manager of this workers death
1156
            manage(w.manager, w.id, w.config, :deregister)
69✔
1157
            if PGRP.topology !== :all_to_all || isclusterlazy()
124✔
1158
                for rpid in workers()
62✔
1159
                    try
530✔
1160
                        remote_do(deregister_worker, rpid, pid)
1,045✔
1161
                    catch
×
1162
                    end
1163
                end
592✔
1164
            end
1165
        end
1166
    end
1167
    push!(map_del_wrkr, pid)
572✔
1168

1169
    # delete this worker from our remote reference client sets
1170
    ids = []
572✔
1171
    tonotify = []
572✔
1172
    lock(client_refs) do
572✔
1173
        for (id, rv) in pg.refs
672✔
1174
            if in(pid, rv.clientset)
816✔
1175
                push!(ids, id)
5✔
1176
            end
1177
            if rv.waitingfor == pid
816✔
1178
                push!(tonotify, (id, rv))
×
1179
            end
1180
        end
1,532✔
1181
        for id in ids
1,140✔
1182
            del_client(pg, id, pid)
5✔
1183
        end
9✔
1184

1185
        # throw exception to tasks waiting for this pid
1186
        for (id, rv) in tonotify
1,144✔
1187
            close(rv.c, ProcessExitedException(pid))
×
1188
            delete!(pg.refs, id)
×
1189
        end
×
1190
    end
1191
    return
572✔
1192
end
1193

1194

1195
function interrupt(pid::Integer)
×
1196
    @assert myid() == 1
×
1197
    w = map_pid_wrkr[pid]
×
1198
    if isa(w, Worker)
×
1199
        manage(w.manager, w.id, w.config, :interrupt)
×
1200
    end
1201
    return
×
1202
end
1203

1204
"""
1205
    interrupt(pids::Integer...)
1206

1207
Interrupt the current executing task on the specified workers. This is equivalent to
1208
pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
1209
"""
1210
interrupt(pids::Integer...) = interrupt([pids...])
×
1211

1212
"""
1213
    interrupt(pids::AbstractVector=workers())
1214

1215
Interrupt the current executing task on the specified workers. This is equivalent to
1216
pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
1217
"""
1218
function interrupt(pids::AbstractVector=workers())
×
1219
    @assert myid() == 1
×
1220
    @sync begin
×
1221
        for pid in pids
×
1222
            @async interrupt(pid)
×
1223
        end
×
1224
    end
1225
end
1226

1227
wp_bind_addr(p::LocalProcess) = p.bind_addr
×
1228
wp_bind_addr(p) = p.config.bind_addr
×
1229

1230
function check_same_host(pids)
5✔
1231
    if myid() != 1
5✔
1232
        return remotecall_fetch(check_same_host, 1, pids)
×
1233
    else
1234
        # We checkfirst if all test pids have been started using the local manager,
1235
        # else we check for the same bind_to addr. This handles the special case
1236
        # where the local ip address may change - as during a system sleep/awake
1237
        if all(p -> (p==1) || (isa(map_pid_wrkr[p].manager, LocalManager)), pids)
29✔
1238
            return true
5✔
1239
        else
1240
            first_bind_addr = notnothing(wp_bind_addr(map_pid_wrkr[pids[1]]))
×
1241
            return all(p -> notnothing(wp_bind_addr(map_pid_wrkr[p])) == first_bind_addr, pids[2:end])
×
1242
        end
1243
    end
1244
end
1245

1246
function terminate_all_workers()
×
1247
    myid() != 1 && return
×
1248

1249
    if nprocs() > 1
×
1250
        try
×
1251
            rmprocs(workers(); waitfor=5.0)
×
1252
        catch _ex
1253
            @warn "Forcibly interrupting busy workers" exception=_ex
×
1254
            # Might be computation bound, interrupt them and try again
1255
            interrupt(workers())
×
1256
            try
×
1257
                rmprocs(workers(); waitfor=5.0)
×
1258
            catch _ex2
1259
                @error "Unable to terminate all workers" exception=_ex2,catch_backtrace()
×
1260
            end
1261
        end
1262
    end
1263
end
1264

1265
# initialize the local proc network address / port
1266
function init_bind_addr()
88✔
1267
    opts = JLOptions()
88✔
1268
    if opts.bindto != C_NULL
88✔
1269
        bind_to = split(unsafe_string(opts.bindto), ":")
77✔
1270
        bind_addr = string(parse(IPAddr, bind_to[1]))
77✔
1271
        if length(bind_to) > 1
77✔
1272
            bind_port = parse(Int,bind_to[2])
×
1273
        else
1274
            bind_port = 0
154✔
1275
        end
1276
    else
1277
        bind_port = 0
11✔
1278
        try
11✔
1279
            bind_addr = string(getipaddr())
11✔
1280
        catch
1281
            # All networking is unavailable, initialize bind_addr to the loopback address
1282
            # Will cause an exception to be raised only when used.
1283
            bind_addr = "127.0.0.1"
×
1284
        end
1285
    end
1286
    global LPROC
88✔
1287
    LPROC.bind_addr = bind_addr
88✔
1288
    LPROC.bind_port = UInt16(bind_port)
88✔
1289
end
1290

1291
using Random: randstring
1292

1293
let inited = false
1294
    # do initialization that's only needed when there is more than 1 processor
1295
    global function init_multi()
466✔
1296
        if !inited
466✔
1297
            inited = true
88✔
1298
            push!(Base.package_callbacks, _require_callback)
88✔
1299
            atexit(terminate_all_workers)
88✔
1300
            init_bind_addr()
88✔
1301
            cluster_cookie(randstring(HDR_COOKIE_LEN))
88✔
1302
        end
1303
        return nothing
466✔
1304
    end
1305
end
1306

1307
function init_parallel()
×
1308
    start_gc_msgs_task()
92✔
1309

1310
    # start in "head node" mode, if worker, will override later.
1311
    global PGRP
×
1312
    global LPROC
×
1313
    LPROC.id = 1
92✔
1314
    @assert isempty(PGRP.workers)
92✔
1315
    register_worker(LPROC)
92✔
1316
end
1317

1318
write_cookie(io::IO) = print(io.in, string(cluster_cookie(), "\n"))
72✔
1319

1320
function get_threads_spec(opts)
5✔
1321
    if opts.nthreads > 0
5✔
1322
        @assert opts.nthreadpools >= 1
×
1323
        @assert opts.nthreads_per_pool != C_NULL
×
1324
        thr = "$(unsafe_load(opts.nthreads_per_pool))"
×
1325
        if opts.nthreadpools == 2
×
1326
            thr = "$(thr),$(unsafe_load(opts.nthreads_per_pool, 2))"
×
1327
        end
1328
        `--threads=$(thr)`
×
1329
    else
1330
        ``
5✔
1331
    end
1332
end
1333

1334
function get_gcthreads_spec(opts)
5✔
1335
    if opts.nmarkthreads > 0 || opts.nsweepthreads > 0
10✔
1336
        `--gcthreads=$(opts.nmarkthreads),$(opts.nsweepthreads)`
×
1337
    else
1338
        ``
5✔
1339
    end
1340
end
1341

1342
# Starts workers specified by (-n|--procs) and --machine-file command line options
1343
function process_opts(opts)
79✔
1344
    # startup worker.
1345
    # opts.startupfile, opts.load, etc should should not be processed for workers.
1346
    if opts.worker == 1
79✔
1347
        # does not return
1348
        if opts.cookie != C_NULL
74✔
1349
            start_worker(unsafe_string(opts.cookie))
3✔
1350
        else
1351
            start_worker()
71✔
1352
        end
1353
    end
1354

1355
    # Propagate --threads to workers
1356
    threads = get_threads_spec(opts)
5✔
1357
    # Propagate --gcthreads to workers
1358
    gcthreads = get_gcthreads_spec(opts)
5✔
1359

1360
    exeflags = `$threads $gcthreads`
5✔
1361

1362
    # add processors
1363
    if opts.nprocs > 0
5✔
1364
        addprocs(opts.nprocs; exeflags=exeflags)
4✔
1365
    end
1366

1367
    # load processes from machine file
1368
    if opts.machine_file != C_NULL
5✔
1369
        addprocs(load_machine_file(unsafe_string(opts.machine_file)); exeflags=exeflags)
1✔
1370
    end
1371
    return nothing
5✔
1372
end
1373

1374

1375
function load_machine_file(path::AbstractString)
1✔
1376
    machines = []
1✔
1377
    for line in split(read(path, String),'\n'; keepempty=false)
2✔
1378
        s = split(line, '*'; keepempty=false)
×
1379
        map!(strip, s, s)
×
1380
        if length(s) > 1
×
1381
            cnt = all(isdigit, s[1]) ? parse(Int,s[1]) : Symbol(s[1])
×
1382
            push!(machines,(s[2], cnt))
×
1383
        else
1384
            push!(machines,line)
×
1385
        end
1386
    end
×
1387
    return machines
1✔
1388
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc