• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37431

pending completion
#37431

push

local

web-flow
🤖 [master] Bump the Pkg stdlib from 3ac94b211 to ed505db0b (#48528)

72309 of 77895 relevant lines covered (92.83%)

33191018.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.52
/stdlib/Distributed/src/managers.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
# Built-in SSH and Local Managers
4

5
struct SSHManager <: ClusterManager
6
    machines::Dict
7

8
    function SSHManager(machines)
2✔
9
        # machines => array of machine elements
10
        # machine => address or (address, cnt)
11
        # address => string of form `[user@]host[:port] bind_addr[:bind_port]`
12
        # cnt => :auto or number
13
        # :auto launches NUM_CORES number of workers at address
14
        # number launches the specified number of workers at address
15
        mhist = Dict()
2✔
16
        for m in machines
4✔
17
            if isa(m, Tuple)
9✔
18
                host=m[1]
×
19
                cnt=m[2]
×
20
            else
21
                host=m
9✔
22
                cnt=1
9✔
23
            end
24
            current_cnt = get(mhist, host, 0)
13✔
25

26
            if isa(cnt, Number)
9✔
27
                mhist[host] = isa(current_cnt, Number) ? current_cnt + Int(cnt) : Int(cnt)
9✔
28
            else
29
                mhist[host] = cnt
×
30
            end
31
        end
17✔
32
        new(mhist)
2✔
33
    end
34
end
35

36

37
function check_addprocs_args(manager, kwargs)
45✔
38
    valid_kw_names = keys(default_addprocs_params(manager))
45✔
39
    for keyname in keys(kwargs)
45✔
40
        !(keyname in valid_kw_names) && throw(ArgumentError("Invalid keyword argument $(keyname)"))
150✔
41
    end
75✔
42
end
43

44
# SSHManager
45

46
# start and connect to processes via SSH, optionally through an SSH tunnel.
47
# the tunnel is only used from the head (process 1); the nodes are assumed
48
# to be mutually reachable without a tunnel, as is often the case in a cluster.
49
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
50
# A machine is either a <hostname> or a tuple of (<hostname>, count)
51
"""
52
    addprocs(machines; tunnel=false, sshflags=\`\`, max_parallel=10, kwargs...) -> List of process identifiers
53

54
Add worker processes on remote machines via SSH. Configuration is done with keyword
55
arguments (see below). In particular, the `exename` keyword can be used to specify
56
the path to the `julia` binary on the remote machine(s).
57

58
`machines` is a vector of "machine specifications" which are given as strings of
59
the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port`
60
to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect
61
to this worker at the specified `bind_addr` and `port`.
62

63
It is possible to launch multiple processes on a remote host by using a tuple in the
64
`machines` vector or the form `(machine_spec, count)`, where `count` is the number of
65
workers to be launched on the specified host. Passing `:auto` as the worker count will
66
launch as many workers as the number of CPU threads on the remote host.
67

68
**Examples**:
69
```julia
70
addprocs([
71
    "remote1",               # one worker on 'remote1' logging in with the current username
72
    "user@remote2",          # one worker on 'remote2' logging in with the 'user' username
73
    "user@remote3:2222",     # specifying SSH port to '2222' for 'remote3'
74
    ("user@remote4", 4),     # launch 4 workers on 'remote4'
75
    ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
76
])
77
```
78

79
**Keyword arguments**:
80

81
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
82
  master process. Default is `false`.
83

84
* `multiplex`: if `true` then SSH multiplexing is used for SSH tunneling. Default is `false`.
85

86
* `ssh`: the name or path of the SSH client executable used to start the workers.
87
  Default is `"ssh"`.
88

89
* `sshflags`: specifies additional ssh options, e.g. ``` sshflags=\`-i /home/foo/bar.pem\` ```
90

91
* `max_parallel`: specifies the maximum number of workers connected to in parallel at a
92
  host. Defaults to 10.
93

94
* `shell`: specifies the type of shell to which ssh connects on the workers.
95

96
    + `shell=:posix`: a POSIX-compatible Unix/Linux shell
97
      (sh, ksh, bash, dash, zsh, etc.). The default.
98

99
    + `shell=:csh`: a Unix C shell (csh, tcsh).
100

101
    + `shell=:wincmd`: Microsoft Windows `cmd.exe`.
102

103
* `dir`: specifies the working directory on the workers. Defaults to the host's current
104
  directory (as found by `pwd()`)
105

106
* `enable_threaded_blas`: if `true` then  BLAS will run on multiple threads in added
107
  processes. Default is `false`.
108

109
* `exename`: name of the `julia` executable. Defaults to `"\$(Sys.BINDIR)/julia"` or
110
  `"\$(Sys.BINDIR)/julia-debug"` as the case may be. It is recommended that a common Julia
111
  version is used on all remote machines because serialization and code distribution might
112
  fail otherwise.
113

114
* `exeflags`: additional flags passed to the worker processes.
115

116
* `topology`: Specifies how the workers connect to each other. Sending a message between
117
  unconnected workers results in an error.
118

119
    + `topology=:all_to_all`: All processes are connected to each other. The default.
120

121
    + `topology=:master_worker`: Only the driver process, i.e. `pid` 1 connects to the
122
      workers. The workers do not connect to each other.
123

124
    + `topology=:custom`: The `launch` method of the cluster manager specifies the
125
      connection topology via fields `ident` and `connect_idents` in `WorkerConfig`.
126
      A worker with a cluster manager identity `ident` will connect to all workers specified
127
      in `connect_idents`.
128

129
* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections
130
  are setup lazily, i.e. they are setup at the first instance of a remote call between
131
  workers. Default is true.
132

133
* `env`: provide an array of string pairs such as
134
  `env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables
135
  are set on the remote machine. By default only the environment variable
136
  `JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote
137
  environment.
138

139
* `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline
140
   option. The (more secure) default behaviour of passing the cookie via ssh stdio
141
   may hang with Windows workers that use older (pre-ConPTY) Julia or Windows versions,
142
   in which case `cmdline_cookie=true` offers a work-around.
143

144
!!! compat "Julia 1.6"
145
    The keyword arguments `ssh`, `shell`, `env` and `cmdline_cookie`
146
    were added in Julia 1.6.
147

148
Environment variables:
149

150
If the master process fails to establish a connection with a newly launched worker within
151
60.0 seconds, the worker treats it as a fatal situation and terminates.
152
This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`.
153
The value of `JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a
154
newly launched worker waits for connection establishment.
155
"""
156
function addprocs(machines::AbstractVector; kwargs...)
2✔
157
    manager = SSHManager(machines)
1✔
158
    check_addprocs_args(manager, kwargs)
1✔
159
    addprocs(manager; kwargs...)
1✔
160
end
161

162
default_addprocs_params(::SSHManager) =
2✔
163
    merge(default_addprocs_params(),
164
          Dict{Symbol,Any}(
165
              :ssh            => "ssh",
166
              :sshflags       => ``,
167
              :shell          => :posix,
168
              :cmdline_cookie => false,
169
              :env            => [],
170
              :tunnel         => false,
171
              :multiplex      => false,
172
              :max_parallel   => 10))
173

174
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
1✔
175
    # Launch one worker on each unique host in parallel. Additional workers are launched later.
176
    # Wait for all launches to complete.
177
    @sync for (i, (machine, cnt)) in enumerate(manager.machines)
1✔
178
        let machine=machine, cnt=cnt
×
179
             @async try
×
180
                launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
181
            catch e
182
                print(stderr, "exception launching on machine $(machine) : $(e)\n")
183
            end
184
        end
185
    end
×
186
    notify(launch_ntfy)
1✔
187
end
188

189

190
Base.show(io::IO, manager::SSHManager) = print(io, "SSHManager(machines=", manager.machines, ")")
1✔
191

192

193
function parse_machine(machine::AbstractString)
13✔
194
    hoststr = ""
13✔
195
    portnum = nothing
13✔
196

197
    if machine[begin] == '['  # ipv6 bracket notation (RFC 2732)
26✔
198
        ipv6_end = findlast(']', machine)
5✔
199
        if ipv6_end === nothing
5✔
200
            throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine\""))
1✔
201
        end
202
        hoststr = machine[begin+1 : prevind(machine,ipv6_end)]
8✔
203
        machine_def = split(machine[ipv6_end : end] , ':')
8✔
204
    else    # ipv4
205
        machine_def = split(machine, ':')
8✔
206
        hoststr = machine_def[1]
8✔
207
    end
208

209
    if length(machine_def) > 2
12✔
210
        throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine_def\""))
1✔
211
    end
212

213
    if length(machine_def) == 2
11✔
214
        portstr = machine_def[2]
9✔
215

216
        portnum = tryparse(Int, portstr)
9✔
217
        if portnum === nothing
9✔
218
            msg = "invalid machine definition format string: invalid port format \"$machine_def\""
1✔
219
            throw(ArgumentError(msg))
1✔
220
        end
221

222
        if portnum < 1 || portnum > 65535
14✔
223
            msg = "invalid machine definition format string: invalid port number \"$machine_def\""
3✔
224
            throw(ArgumentError(msg))
3✔
225
        end
226
    end
227
    (hoststr, portnum)
7✔
228
end
229

230
function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, params::Dict, launched::Array, launch_ntfy::Condition)
231
    shell = params[:shell]
232
    ssh = params[:ssh]
233
    dir = params[:dir]
234
    exename = params[:exename]
235
    exeflags = params[:exeflags]
236
    tunnel = params[:tunnel]
237
    multiplex = params[:multiplex]
238
    cmdline_cookie = params[:cmdline_cookie]
239
    env = Dict{String,String}(params[:env])
240

241
    # machine could be of the format [user@]host[:port] bind_addr[:bind_port]
242
    # machine format string is split on whitespace
243
    machine_bind = split(machine)
244
    if isempty(machine_bind)
245
        throw(ArgumentError("invalid machine definition format string: \"$machine\$"))
246
    end
247
    if length(machine_bind) > 1
248
        exeflags = `--bind-to $(machine_bind[2]) $exeflags`
249
    end
250
    if cmdline_cookie
251
        exeflags = `$exeflags --worker=$(cluster_cookie())`
252
    else
253
        exeflags = `$exeflags --worker`
254
    end
255

256
    host, portnum = parse_machine(machine_bind[1])
257
    portopt = portnum === nothing ? `` : `-p $portnum`
258
    sshflags = `$(params[:sshflags]) $portopt`
259

260
    if tunnel
261
        # First it checks if ssh multiplexing has been already enabled and the master process is running.
262
        # If it's already running, later ssh sessions also use the same ssh multiplexing session even if
263
        # `multiplex` is not explicitly specified; otherwise the tunneling session launched later won't
264
        # go to background and hang. This is because of OpenSSH implementation.
265
        if success(`$ssh $sshflags -O check $host`)
266
            multiplex = true
267
        elseif multiplex
268
            # automatically create an SSH multiplexing session at the next SSH connection
269
            controlpath = "~/.ssh/julia-%r@%h:%p"
270
            sshflags = `$sshflags -o ControlMaster=auto -o ControlPath=$controlpath -o ControlPersist=no`
271
        end
272
    end
273

274
    # Build up the ssh command
275

276
    # pass on some environment variables by default
277
    for var in ["JULIA_WORKER_TIMEOUT"]
278
        if !haskey(env, var) && haskey(ENV, var)
279
            env[var] = ENV[var]
280
        end
281
    end
282

283
    # Julia process with passed in command line flag arguments
284
    if shell === :posix
285
        # ssh connects to a POSIX shell
286

287
        cmds = "exec $(shell_escape_posixly(exename)) $(shell_escape_posixly(exeflags))"
288
        # set environment variables
289
        for (var, val) in env
290
            occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) ||
291
                throw(ArgumentError("invalid env key $var"))
292
            cmds = "export $(var)=$(shell_escape_posixly(val))\n$cmds"
293
        end
294
        # change working directory
295
        cmds = "cd -- $(shell_escape_posixly(dir))\n$cmds"
296

297
        # shell login (-l) with string command (-c) to launch julia process
298
        remotecmd = shell_escape_posixly(`sh -l -c $cmds`)
299

300
    elseif shell === :csh
301
        # ssh connects to (t)csh
302

303
        remotecmd = "exec $(shell_escape_csh(exename)) $(shell_escape_csh(exeflags))"
304

305
        # set environment variables
306
        for (var, val) in env
307
            occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) ||
308
                throw(ArgumentError("invalid env key $var"))
309
            remotecmd = "setenv $(var) $(shell_escape_csh(val))\n$remotecmd"
310
        end
311
        # change working directory
312
        if dir !== nothing && dir != ""
313
            remotecmd = "cd $(shell_escape_csh(dir))\n$remotecmd"
314
        end
315

316
    elseif shell === :wincmd
317
        # ssh connects to Windows cmd.exe
318

319
        any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))
320

321
        remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
322
        # change working directory
323
        if dir !== nothing && dir != ""
324
            any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
325
            remotecmd = "pushd \"$(dir)\" && $remotecmd"
326
        end
327
        # set environment variables
328
        for (var, val) in env
329
            occursin(r"^[a-zA-Z0-9_()[\]{}\$\\/#',;\.@!?*+-]+\z", var) || throw(ArgumentError("invalid env key $var"))
330
            remotecmd = "set $(var)=$(shell_escape_wincmd(val))&& $remotecmd"
331
        end
332

333
    else
334
        throw(ArgumentError("invalid shell"))
335
    end
336

337
    # remote launch with ssh with given ssh flags / host / port information
338
    # -T → disable pseudo-terminal allocation
339
    # -a → disable forwarding of auth agent connection
340
    # -x → disable X11 forwarding
341
    # -o ClearAllForwardings → option if forwarding connections and
342
    #                          forwarded connections are causing collisions
343
    cmd = `$ssh -T -a -x -o ClearAllForwardings=yes $sshflags $host $remotecmd`
344

345
    # launch the remote Julia process
346

347
    # detach launches the command in a new process group, allowing it to outlive
348
    # the initial julia process (Ctrl-C and teardown methods are handled through messages)
349
    # for the launched processes.
350
    io = open(detach(cmd), "r+")
351
    cmdline_cookie || write_cookie(io)
352

353
    wconfig = WorkerConfig()
354
    wconfig.io = io.out
355
    wconfig.host = host
356
    wconfig.tunnel = tunnel
357
    wconfig.multiplex = multiplex
358
    wconfig.sshflags = sshflags
359
    wconfig.exeflags = exeflags
360
    wconfig.exename = exename
361
    wconfig.count = cnt
362
    wconfig.max_parallel = params[:max_parallel]
363
    wconfig.enable_threaded_blas = params[:enable_threaded_blas]
364

365

366
    push!(launched, wconfig)
367
    notify(launch_ntfy)
368
end
369

370

371
function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol)
×
372
    id = Int(id)
×
373
    if op === :interrupt
×
374
        ospid = config.ospid
×
375
        if ospid !== nothing
×
376
            host = notnothing(config.host)
×
377
            sshflags = notnothing(config.sshflags)
×
378
            if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`)
×
379
                @error "Error sending a Ctrl-C to julia worker $id on $host"
×
380
            end
381
        else
382
            # This state can happen immediately after an addprocs
383
            @error "Worker $id cannot be presently interrupted."
×
384
        end
385
    end
386
end
387

388
let tunnel_port = 9201
389
    global next_tunnel_port
390
    function next_tunnel_port()
×
391
        retval = tunnel_port
×
392
        if tunnel_port > 32000
×
393
            tunnel_port = 9201
×
394
        else
395
            tunnel_port += 1
×
396
        end
397
        retval
×
398
    end
399
end
400

401

402
"""
403
    ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex) -> localport
404

405
Establish an SSH tunnel to a remote worker.
406
Return a port number `localport` such that `localhost:localport` connects to `host:port`.
407
"""
408
function ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex)
×
409
    port = Int(port)
×
410
    cnt = ntries = 100
×
411

412
    # the connection is forwarded to `port` on the remote server over the local port `localport`
413
    while cnt > 0
×
414
        localport = next_tunnel_port()
×
415
        if multiplex
×
416
            # It assumes that an ssh multiplexing session has been already started by the remote worker.
417
            cmd = `ssh $sshflags -O forward -L $localport:$bind_addr:$port $user@$host`
×
418
        else
419
            # if we cannot do port forwarding, fail immediately
420
            # the -f option backgrounds the ssh session
421
            # `sleep 60` command specifies that an allotted time of 60 seconds is allowed to start the
422
            # remote julia process and establish the network connections specified by the process topology.
423
            # If no connections are made within 60 seconds, ssh will exit and an error will be printed on the
424
            # process that launched the remote process.
425
            ssh = `ssh -T -a -x -o ExitOnForwardFailure=yes`
×
426
            cmd = detach(`$ssh -f $sshflags $user@$host -L $localport:$bind_addr:$port sleep 60`)
×
427
        end
428
        if success(cmd)
×
429
            return localport
×
430
        end
431
        cnt -= 1
×
432
    end
×
433

434
    throw(ErrorException(
×
435
        string("unable to create SSH tunnel after ", ntries, " tries. No free port?")))
436
end
437

438

439
# LocalManager
440
struct LocalManager <: ClusterManager
441
    np::Int
31✔
442
    restrict::Bool  # Restrict binding to 127.0.0.1 only
443
end
444

445
"""
446
    addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
447

448
Launch `np` workers on the local host using the in-built `LocalManager`.
449

450
Local workers inherit the current package environment (i.e., active project,
451
[`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process.
452

453
**Keyword arguments**:
454
 - `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`.
455
 - `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect
456
   as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref).
457

458
!!! compat "Julia 1.9"
459
    The inheriting of the package environment and the `env` keyword argument were
460
    added in Julia 1.9.
461
"""
462
function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...)
88✔
463
    manager = LocalManager(np, restrict)
44✔
464
    check_addprocs_args(manager, kwargs)
44✔
465
    addprocs(manager; kwargs...)
44✔
466
end
467

468
Base.show(io::IO, manager::LocalManager) = print(io, "LocalManager()")
1✔
469

470
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
42✔
471
    dir = params[:dir]
42✔
472
    exename = params[:exename]
42✔
473
    exeflags = params[:exeflags]
42✔
474
    bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
42✔
475
    env = Dict{String,String}(params[:env])
82✔
476

477
    # TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function
478
    #       together with load_path() etc. Might be useful to have when spawning julia
479
    #       processes outside of Distributed.jl too.
480
    # JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup,
481
    # but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here.
482
    # Users can opt-out of this by passing `env = ...` to addprocs(...).
483
    pathsep = Sys.iswindows() ? ";" : ":"
42✔
484
    if get(env, "JULIA_LOAD_PATH", nothing) === nothing
44✔
485
        env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep)
40✔
486
    end
487
    if get(env, "JULIA_DEPOT_PATH", nothing) === nothing
44✔
488
        env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep)
40✔
489
    end
490

491
    # If we haven't explicitly asked for threaded BLAS, prevent OpenBLAS from starting
492
    # up with multiple threads, thereby sucking up a bunch of wasted memory on Windows.
493
    if !params[:enable_threaded_blas] &&
42✔
494
       get(env, "OPENBLAS_NUM_THREADS", nothing) === nothing
495
        env["OPENBLAS_NUM_THREADS"] = "1"
41✔
496
    end
497
    # Set the active project on workers using JULIA_PROJECT.
498
    # Users can opt-out of this by (i) passing `env = ...` or (ii) passing
499
    # `--project=...` as `exeflags` to addprocs(...).
500
    project = Base.ACTIVE_PROJECT[]
42✔
501
    if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing
43✔
502
        env["JULIA_PROJECT"] = project
6✔
503
    end
504

505
    for i in 1:manager.np
84✔
506
        cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
88✔
507
        io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
88✔
508
        write_cookie(io)
88✔
509

510
        wconfig = WorkerConfig()
1,760✔
511
        wconfig.process = io
88✔
512
        wconfig.io = io.out
88✔
513
        wconfig.enable_threaded_blas = params[:enable_threaded_blas]
88✔
514
        push!(launched, wconfig)
88✔
515
    end
134✔
516

517
    notify(c)
42✔
518
end
519

520
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
202✔
521
    if op === :interrupt
202✔
522
        kill(config.process, 2)
×
523
    end
524
end
525

526
"""
527
    launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
528

529
Implemented by cluster managers. For every Julia worker launched by this function, it should
530
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
531
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
532
keyword arguments [`addprocs`](@ref) was called with.
533
"""
534
launch
535

536
"""
537
    manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
538

539
Implemented by cluster managers. It is called on the master process, during a worker's
540
lifetime, with appropriate `op` values:
541

542
- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
543
- with `:interrupt` when `interrupt(workers)` is called. The `ClusterManager`
544
  should signal the appropriate worker with an interrupt signal.
545
- with `:finalize` for cleanup purposes.
546
"""
547
manage
548

549
# DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager
550

551
struct DefaultClusterManager <: ClusterManager
552
end
103✔
553

554
const tunnel_hosts_map = Dict{String, Semaphore}()
555

556
"""
557
    connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
558

559
Implemented by cluster managers using custom transports. It should establish a logical
560
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
561
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
562
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
563
ensure that messages are delivered and received completely and in order.
564
`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
565
workers.
566
"""
567
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
170✔
568
    if config.connect_at !== nothing
170✔
569
        # this is a worker-to-worker setup call.
570
        return connect_w2w(pid, config)
64✔
571
    end
572

573
    # master connecting to workers
574
    if config.io !== nothing
106✔
575
        (bind_addr, port::Int) = read_worker_host_port(config.io)
106✔
576
        pubhost = something(config.host, bind_addr)
103✔
577
        config.host = pubhost
103✔
578
        config.port = port
103✔
579
    else
580
        pubhost = notnothing(config.host)
×
581
        port = notnothing(config.port)
×
582
        bind_addr = something(config.bind_addr, pubhost)
×
583
    end
584

585
    tunnel = something(config.tunnel, false)
103✔
586

587
    s = split(pubhost,'@')
103✔
588
    user = ""
103✔
589
    if length(s) > 1
103✔
590
        user = s[1]
×
591
        pubhost = s[2]
×
592
    else
593
        if haskey(ENV, "USER")
103✔
594
            user = ENV["USER"]
95✔
595
        elseif tunnel
8✔
596
            error("USER must be specified either in the environment ",
×
597
                  "or as part of the hostname when tunnel option is used")
598
        end
599
    end
600

601
    if tunnel
103✔
602
        if !haskey(tunnel_hosts_map, pubhost)
×
603
            tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int)))
×
604
        end
605
        sem = tunnel_hosts_map[pubhost]
×
606

607
        sshflags = notnothing(config.sshflags)
×
608
        multiplex = something(config.multiplex, false)
×
609
        acquire(sem)
×
610
        try
×
611
            (s, bind_addr, forward) = connect_to_worker_with_tunnel(pubhost, bind_addr, port, user, sshflags, multiplex)
×
612
            config.forward = forward
×
613
        finally
614
            release(sem)
×
615
        end
616
    else
617
        (s, bind_addr) = connect_to_worker(bind_addr, port)
103✔
618
    end
619

620
    config.bind_addr = bind_addr
103✔
621

622
    # write out a subset of the connect_at required for further worker-worker connection setups
623
    config.connect_at = (bind_addr, port)
103✔
624

625
    if config.io !== nothing
103✔
626
        let pid = pid
103✔
627
            redirect_worker_output(pid, notnothing(config.io))
206✔
628
        end
629
    end
630

631
    (s, s)
103✔
632
end
633

634
function connect_w2w(pid::Int, config::WorkerConfig)
64✔
635
    (rhost, rport) = notnothing(config.connect_at)::Tuple{String, Int}
128✔
636
    config.host = rhost
64✔
637
    config.port = rport
64✔
638
    (s, bind_addr) = connect_to_worker(rhost, rport)
64✔
639
    (s,s)
64✔
640
end
641

642
const client_port = Ref{UInt16}(0)
643

644
function socket_reuse_port(iptype)
167✔
645
    if ccall(:jl_has_so_reuseport, Int32, ()) == 1
167✔
646
        sock = TCPSocket(delay = false)
167✔
647

648
        # Some systems (e.g. Linux) require the port to be bound before setting REUSEPORT
649
        bind_early = Sys.islinux()
167✔
650

651
        bind_early && bind_client_port(sock, iptype)
167✔
652
        rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), sock.handle)
167✔
653
        if rc < 0
167✔
654
            close(sock)
×
655

656
            # This is an issue only on systems with lots of client connections, hence delay the warning
657
            nworkers() > 128 && @warn "Error trying to reuse client port number, falling back to regular socket" maxlog=1
×
658

659
            # provide a clean new socket
660
            return TCPSocket()
×
661
        end
662
        bind_early || bind_client_port(sock, iptype)
167✔
663
        return sock
167✔
664
    else
665
        return TCPSocket()
×
666
    end
667
end
668

669
function bind_client_port(sock::TCPSocket, iptype)
167✔
670
    bind_host = iptype(0)
167✔
671
    if Sockets.bind(sock, bind_host, client_port[])
167✔
672
        _addr, port = getsockname(sock)
167✔
673
        client_port[] = port
167✔
674
    end
675
    return sock
167✔
676
end
677

678
function connect_to_worker(host::AbstractString, port::Integer)
167✔
679
    # Avoid calling getaddrinfo if possible - involves a DNS lookup
680
    # host may be a stringified ipv4 / ipv6 address or a dns name
681
    bind_addr = nothing
167✔
682
    try
167✔
683
        bind_addr = parse(IPAddr,host)
334✔
684
    catch
685
        bind_addr = getaddrinfo(host)
×
686
    end
687

688
    iptype = typeof(bind_addr)
167✔
689
    sock = socket_reuse_port(iptype)
167✔
690
    connect(sock, bind_addr, UInt16(port))
167✔
691

692
    (sock, string(bind_addr))
167✔
693
end
694

695

696
function connect_to_worker_with_tunnel(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags, multiplex)
×
697
    localport = ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags, multiplex)
×
698
    s = connect("localhost", localport)
×
699
    forward = "$localport:$bind_addr:$port"
×
700
    (s, bind_addr, forward)
×
701
end
702

703

704
function cancel_ssh_tunnel(config::WorkerConfig)
×
705
    host = notnothing(config.host)
×
706
    sshflags = notnothing(config.sshflags)
×
707
    tunnel = something(config.tunnel, false)
×
708
    multiplex = something(config.multiplex, false)
×
709
    if tunnel && multiplex
×
710
        forward = notnothing(config.forward)
×
711
        run(`ssh $sshflags -O cancel -L $forward $host`)
×
712
    end
713
end
714

715

716
"""
717
    kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
718

719
Implemented by cluster managers.
720
It is called on the master process, by [`rmprocs`](@ref).
721
It should cause the remote worker specified by `pid` to exit.
722
`kill(manager::ClusterManager.....)` executes a remote `exit()`
723
on `pid`.
724
"""
725
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
18✔
726
    remote_do(exit, pid)
33✔
727
    nothing
15✔
728
end
729

730
function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
×
731
    remote_do(exit, pid)
×
732
    cancel_ssh_tunnel(config)
×
733
    nothing
×
734
end
735

736
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
126✔
737
    # First, try sending `exit()` to the remote over the usual control channels
738
    remote_do(exit, pid)
126✔
739

740
    timer_task = @async begin
63✔
741
        sleep(exit_timeout)
63✔
742

743
        # Check to see if our child exited, and if not, send an actual kill signal
744
        if !process_exited(config.process)
63✔
745
            @warn("Failed to gracefully kill worker $(pid), sending SIGTERM")
1✔
746
            kill(config.process, Base.SIGTERM)
1✔
747

748
            sleep(term_timeout)
1✔
749
            if !process_exited(config.process)
1✔
750
                @warn("Worker $(pid) ignored SIGTERM, sending SIGKILL")
×
751
                kill(config.process, Base.SIGKILL)
1✔
752
            end
753
        end
754
    end
755
    errormonitor(timer_task)
63✔
756
    return nothing
63✔
757
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc