• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37527

pending completion
#37527

push

local

web-flow
make `IRShow.method_name` inferrable (#49607)

18 of 18 new or added lines in 3 files covered. (100.0%)

68710 of 81829 relevant lines covered (83.97%)

33068903.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.87
/stdlib/Distributed/src/managers.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
# Built-in SSH and Local Managers
4

5
struct SSHManager <: ClusterManager
6
    machines::Dict
7

8
    function SSHManager(machines)
1✔
9
        # machines => array of machine elements
10
        # machine => address or (address, cnt)
11
        # address => string of form `[user@]host[:port] bind_addr[:bind_port]`
12
        # cnt => :auto or number
13
        # :auto launches NUM_CORES number of workers at address
14
        # number launches the specified number of workers at address
15
        mhist = Dict()
1✔
16
        for m in machines
2✔
17
            if isa(m, Tuple)
×
18
                host=m[1]
×
19
                cnt=m[2]
×
20
            else
21
                host=m
×
22
                cnt=1
×
23
            end
24
            current_cnt = get(mhist, host, 0)
×
25

26
            if isa(cnt, Number)
×
27
                mhist[host] = isa(current_cnt, Number) ? current_cnt + Int(cnt) : Int(cnt)
×
28
            else
29
                mhist[host] = cnt
×
30
            end
31
        end
×
32
        new(mhist)
1✔
33
    end
34
end
35

36

37
function check_addprocs_args(manager, kwargs)
47✔
38
    valid_kw_names = keys(default_addprocs_params(manager))
47✔
39
    for keyname in keys(kwargs)
47✔
40
        !(keyname in valid_kw_names) && throw(ArgumentError("Invalid keyword argument $(keyname)"))
78✔
41
    end
78✔
42
end
43

44
# SSHManager
45

46
# start and connect to processes via SSH, optionally through an SSH tunnel.
47
# the tunnel is only used from the head (process 1); the nodes are assumed
48
# to be mutually reachable without a tunnel, as is often the case in a cluster.
49
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
50
# A machine is either a <hostname> or a tuple of (<hostname>, count)
51
"""
52
    addprocs(machines; tunnel=false, sshflags=\`\`, max_parallel=10, kwargs...) -> List of process identifiers
53

54
Add worker processes on remote machines via SSH. Configuration is done with keyword
55
arguments (see below). In particular, the `exename` keyword can be used to specify
56
the path to the `julia` binary on the remote machine(s).
57

58
`machines` is a vector of "machine specifications" which are given as strings of
59
the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port`
60
to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect
61
to this worker at the specified `bind_addr` and `port`.
62

63
It is possible to launch multiple processes on a remote host by using a tuple in the
64
`machines` vector or the form `(machine_spec, count)`, where `count` is the number of
65
workers to be launched on the specified host. Passing `:auto` as the worker count will
66
launch as many workers as the number of CPU threads on the remote host.
67

68
**Examples**:
69
```julia
70
addprocs([
71
    "remote1",               # one worker on 'remote1' logging in with the current username
72
    "user@remote2",          # one worker on 'remote2' logging in with the 'user' username
73
    "user@remote3:2222",     # specifying SSH port to '2222' for 'remote3'
74
    ("user@remote4", 4),     # launch 4 workers on 'remote4'
75
    ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
76
])
77
```
78

79
**Keyword arguments**:
80

81
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
82
  master process. Default is `false`.
83

84
* `multiplex`: if `true` then SSH multiplexing is used for SSH tunneling. Default is `false`.
85

86
* `ssh`: the name or path of the SSH client executable used to start the workers.
87
  Default is `"ssh"`.
88

89
* `sshflags`: specifies additional ssh options, e.g. ``` sshflags=\`-i /home/foo/bar.pem\` ```
90

91
* `max_parallel`: specifies the maximum number of workers connected to in parallel at a
92
  host. Defaults to 10.
93

94
* `shell`: specifies the type of shell to which ssh connects on the workers.
95

96
    + `shell=:posix`: a POSIX-compatible Unix/Linux shell
97
      (sh, ksh, bash, dash, zsh, etc.). The default.
98

99
    + `shell=:csh`: a Unix C shell (csh, tcsh).
100

101
    + `shell=:wincmd`: Microsoft Windows `cmd.exe`.
102

103
* `dir`: specifies the working directory on the workers. Defaults to the host's current
104
  directory (as found by `pwd()`)
105

106
* `enable_threaded_blas`: if `true` then  BLAS will run on multiple threads in added
107
  processes. Default is `false`.
108

109
* `exename`: name of the `julia` executable. Defaults to `"\$(Sys.BINDIR)/julia"` or
110
  `"\$(Sys.BINDIR)/julia-debug"` as the case may be. It is recommended that a common Julia
111
  version is used on all remote machines because serialization and code distribution might
112
  fail otherwise.
113

114
* `exeflags`: additional flags passed to the worker processes.
115

116
* `topology`: Specifies how the workers connect to each other. Sending a message between
117
  unconnected workers results in an error.
118

119
    + `topology=:all_to_all`: All processes are connected to each other. The default.
120

121
    + `topology=:master_worker`: Only the driver process, i.e. `pid` 1 connects to the
122
      workers. The workers do not connect to each other.
123

124
    + `topology=:custom`: The `launch` method of the cluster manager specifies the
125
      connection topology via fields `ident` and `connect_idents` in `WorkerConfig`.
126
      A worker with a cluster manager identity `ident` will connect to all workers specified
127
      in `connect_idents`.
128

129
* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections
130
  are setup lazily, i.e. they are setup at the first instance of a remote call between
131
  workers. Default is true.
132

133
* `env`: provide an array of string pairs such as
134
  `env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables
135
  are set on the remote machine. By default only the environment variable
136
  `JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote
137
  environment.
138

139
* `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline
140
   option. The (more secure) default behaviour of passing the cookie via ssh stdio
141
   may hang with Windows workers that use older (pre-ConPTY) Julia or Windows versions,
142
   in which case `cmdline_cookie=true` offers a work-around.
143

144
!!! compat "Julia 1.6"
145
    The keyword arguments `ssh`, `shell`, `env` and `cmdline_cookie`
146
    were added in Julia 1.6.
147

148
Environment variables:
149

150
If the master process fails to establish a connection with a newly launched worker within
151
60.0 seconds, the worker treats it as a fatal situation and terminates.
152
This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`.
153
The value of `JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a
154
newly launched worker waits for connection establishment.
155
"""
156
function addprocs(machines::AbstractVector; kwargs...)
2✔
157
    manager = SSHManager(machines)
1✔
158
    check_addprocs_args(manager, kwargs)
1✔
159
    addprocs(manager; kwargs...)
1✔
160
end
161

162
default_addprocs_params(::SSHManager) =
2✔
163
    merge(default_addprocs_params(),
164
          Dict{Symbol,Any}(
165
              :ssh            => "ssh",
166
              :sshflags       => ``,
167
              :shell          => :posix,
168
              :cmdline_cookie => false,
169
              :env            => [],
170
              :tunnel         => false,
171
              :multiplex      => false,
172
              :max_parallel   => 10))
173

174
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
1✔
175
    # Launch one worker on each unique host in parallel. Additional workers are launched later.
176
    # Wait for all launches to complete.
177
    @sync for (i, (machine, cnt)) in enumerate(manager.machines)
1✔
178
        let machine=machine, cnt=cnt
×
179
             @async try
×
180
                launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
181
            catch e
182
                print(stderr, "exception launching on machine $(machine) : $(e)\n")
183
            end
184
        end
185
    end
×
186
    notify(launch_ntfy)
1✔
187
end
188

189

190
Base.show(io::IO, manager::SSHManager) = print(io, "SSHManager(machines=", manager.machines, ")")
×
191

192

193
function parse_machine(machine::AbstractString)
×
194
    hoststr = ""
×
195
    portnum = nothing
×
196

197
    if machine[begin] == '['  # ipv6 bracket notation (RFC 2732)
×
198
        ipv6_end = findlast(']', machine)
×
199
        if ipv6_end === nothing
×
200
            throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine\""))
×
201
        end
202
        hoststr = machine[begin+1 : prevind(machine,ipv6_end)]
×
203
        machine_def = split(machine[ipv6_end : end] , ':')
×
204
    else    # ipv4
205
        machine_def = split(machine, ':')
×
206
        hoststr = machine_def[1]
×
207
    end
208

209
    if length(machine_def) > 2
×
210
        throw(ArgumentError("invalid machine definition format string: invalid port format \"$machine_def\""))
×
211
    end
212

213
    if length(machine_def) == 2
×
214
        portstr = machine_def[2]
×
215

216
        portnum = tryparse(Int, portstr)
×
217
        if portnum === nothing
×
218
            msg = "invalid machine definition format string: invalid port format \"$machine_def\""
×
219
            throw(ArgumentError(msg))
×
220
        end
221

222
        if portnum < 1 || portnum > 65535
×
223
            msg = "invalid machine definition format string: invalid port number \"$machine_def\""
×
224
            throw(ArgumentError(msg))
×
225
        end
226
    end
227
    (hoststr, portnum)
×
228
end
229

230
function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, params::Dict, launched::Array, launch_ntfy::Condition)
×
231
    shell = params[:shell]
×
232
    ssh = params[:ssh]
×
233
    dir = params[:dir]
×
234
    exename = params[:exename]
×
235
    exeflags = params[:exeflags]
×
236
    tunnel = params[:tunnel]
×
237
    multiplex = params[:multiplex]
×
238
    cmdline_cookie = params[:cmdline_cookie]
×
239
    env = Dict{String,String}(params[:env])
×
240

241
    # machine could be of the format [user@]host[:port] bind_addr[:bind_port]
242
    # machine format string is split on whitespace
243
    machine_bind = split(machine)
×
244
    if isempty(machine_bind)
×
245
        throw(ArgumentError("invalid machine definition format string: \"$machine\$"))
×
246
    end
247
    if length(machine_bind) > 1
×
248
        exeflags = `--bind-to $(machine_bind[2]) $exeflags`
×
249
    end
250
    if cmdline_cookie
×
251
        exeflags = `$exeflags --worker=$(cluster_cookie())`
×
252
    else
253
        exeflags = `$exeflags --worker`
×
254
    end
255

256
    host, portnum = parse_machine(machine_bind[1])
×
257
    portopt = portnum === nothing ? `` : `-p $portnum`
×
258
    sshflags = `$(params[:sshflags]) $portopt`
×
259

260
    if tunnel
×
261
        # First it checks if ssh multiplexing has been already enabled and the master process is running.
262
        # If it's already running, later ssh sessions also use the same ssh multiplexing session even if
263
        # `multiplex` is not explicitly specified; otherwise the tunneling session launched later won't
264
        # go to background and hang. This is because of OpenSSH implementation.
265
        if success(`$ssh $sshflags -O check $host`)
×
266
            multiplex = true
×
267
        elseif multiplex
×
268
            # automatically create an SSH multiplexing session at the next SSH connection
269
            controlpath = "~/.ssh/julia-%r@%h:%p"
×
270
            sshflags = `$sshflags -o ControlMaster=auto -o ControlPath=$controlpath -o ControlPersist=no`
×
271
        end
272
    end
273

274
    # Build up the ssh command
275

276
    # pass on some environment variables by default
277
    for var in ["JULIA_WORKER_TIMEOUT"]
×
278
        if !haskey(env, var) && haskey(ENV, var)
×
279
            env[var] = ENV[var]
×
280
        end
281
    end
×
282

283
    # Julia process with passed in command line flag arguments
284
    if shell === :posix
×
285
        # ssh connects to a POSIX shell
286

287
        cmds = "exec $(shell_escape_posixly(exename)) $(shell_escape_posixly(exeflags))"
×
288
        # set environment variables
289
        for (var, val) in env
×
290
            occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) ||
×
291
                throw(ArgumentError("invalid env key $var"))
292
            cmds = "export $(var)=$(shell_escape_posixly(val))\n$cmds"
×
293
        end
×
294
        # change working directory
295
        cmds = "cd -- $(shell_escape_posixly(dir))\n$cmds"
×
296

297
        # shell login (-l) with string command (-c) to launch julia process
298
        remotecmd = shell_escape_posixly(`sh -l -c $cmds`)
×
299

300
    elseif shell === :csh
×
301
        # ssh connects to (t)csh
302

303
        remotecmd = "exec $(shell_escape_csh(exename)) $(shell_escape_csh(exeflags))"
×
304

305
        # set environment variables
306
        for (var, val) in env
×
307
            occursin(r"^[a-zA-Z_][a-zA-Z_0-9]*\z", var) ||
×
308
                throw(ArgumentError("invalid env key $var"))
309
            remotecmd = "setenv $(var) $(shell_escape_csh(val))\n$remotecmd"
×
310
        end
×
311
        # change working directory
312
        if dir !== nothing && dir != ""
×
313
            remotecmd = "cd $(shell_escape_csh(dir))\n$remotecmd"
×
314
        end
315

316
    elseif shell === :wincmd
×
317
        # ssh connects to Windows cmd.exe
318

319
        any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))
×
320

321
        remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
×
322
        # change working directory
323
        if dir !== nothing && dir != ""
×
324
            any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
×
325
            remotecmd = "pushd \"$(dir)\" && $remotecmd"
×
326
        end
327
        # set environment variables
328
        for (var, val) in env
×
329
            occursin(r"^[a-zA-Z0-9_()[\]{}\$\\/#',;\.@!?*+-]+\z", var) || throw(ArgumentError("invalid env key $var"))
×
330
            remotecmd = "set $(var)=$(shell_escape_wincmd(val))&& $remotecmd"
×
331
        end
×
332

333
    else
334
        throw(ArgumentError("invalid shell"))
×
335
    end
336

337
    # remote launch with ssh with given ssh flags / host / port information
338
    # -T → disable pseudo-terminal allocation
339
    # -a → disable forwarding of auth agent connection
340
    # -x → disable X11 forwarding
341
    # -o ClearAllForwardings → option if forwarding connections and
342
    #                          forwarded connections are causing collisions
343
    cmd = `$ssh -T -a -x -o ClearAllForwardings=yes $sshflags $host $remotecmd`
×
344

345
    # launch the remote Julia process
346

347
    # detach launches the command in a new process group, allowing it to outlive
348
    # the initial julia process (Ctrl-C and teardown methods are handled through messages)
349
    # for the launched processes.
350
    io = open(detach(cmd), "r+")
×
351
    cmdline_cookie || write_cookie(io)
×
352

353
    wconfig = WorkerConfig()
×
354
    wconfig.io = io.out
×
355
    wconfig.host = host
×
356
    wconfig.tunnel = tunnel
×
357
    wconfig.multiplex = multiplex
×
358
    wconfig.sshflags = sshflags
×
359
    wconfig.exeflags = exeflags
×
360
    wconfig.exename = exename
×
361
    wconfig.count = cnt
×
362
    wconfig.max_parallel = params[:max_parallel]
×
363
    wconfig.enable_threaded_blas = params[:enable_threaded_blas]
×
364

365

366
    push!(launched, wconfig)
×
367
    notify(launch_ntfy)
×
368
end
369

370

371
function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol)
×
372
    id = Int(id)
×
373
    if op === :interrupt
×
374
        ospid = config.ospid
×
375
        if ospid !== nothing
×
376
            host = notnothing(config.host)
×
377
            sshflags = notnothing(config.sshflags)
×
378
            if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`)
×
379
                @error "Error sending a Ctrl-C to julia worker $id on $host"
×
380
            end
381
        else
382
            # This state can happen immediately after an addprocs
383
            @error "Worker $id cannot be presently interrupted."
×
384
        end
385
    end
386
end
387

388
let tunnel_port = 9201
389
    global next_tunnel_port
390
    function next_tunnel_port()
×
391
        retval = tunnel_port
×
392
        if tunnel_port > 32000
×
393
            tunnel_port = 9201
×
394
        else
395
            tunnel_port += 1
×
396
        end
397
        retval
×
398
    end
399
end
400

401

402
"""
403
    ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex) -> localport
404

405
Establish an SSH tunnel to a remote worker.
406
Return a port number `localport` such that `localhost:localport` connects to `host:port`.
407
"""
408
function ssh_tunnel(user, host, bind_addr, port, sshflags, multiplex)
×
409
    port = Int(port)
×
410
    cnt = ntries = 100
×
411

412
    # the connection is forwarded to `port` on the remote server over the local port `localport`
413
    while cnt > 0
×
414
        localport = next_tunnel_port()
×
415
        if multiplex
×
416
            # It assumes that an ssh multiplexing session has been already started by the remote worker.
417
            cmd = `ssh $sshflags -O forward -L $localport:$bind_addr:$port $user@$host`
×
418
        else
419
            # if we cannot do port forwarding, fail immediately
420
            # the -f option backgrounds the ssh session
421
            # `sleep 60` command specifies that an allotted time of 60 seconds is allowed to start the
422
            # remote julia process and establish the network connections specified by the process topology.
423
            # If no connections are made within 60 seconds, ssh will exit and an error will be printed on the
424
            # process that launched the remote process.
425
            ssh = `ssh -T -a -x -o ExitOnForwardFailure=yes`
×
426
            cmd = detach(`$ssh -f $sshflags $user@$host -L $localport:$bind_addr:$port sleep 60`)
×
427
        end
428
        if success(cmd)
×
429
            return localport
×
430
        end
431
        cnt -= 1
×
432
    end
×
433

434
    throw(ErrorException(
×
435
        string("unable to create SSH tunnel after ", ntries, " tries. No free port?")))
436
end
437

438

439
# LocalManager
440
struct LocalManager <: ClusterManager
441
    np::Int
46✔
442
    restrict::Bool  # Restrict binding to 127.0.0.1 only
443
end
444

445
"""
446
    addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
447

448
Launch `np` workers on the local host using the in-built `LocalManager`.
449

450
Local workers inherit the current package environment (i.e., active project,
451
[`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process.
452

453
**Keyword arguments**:
454
 - `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`.
455
 - `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect
456
   as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref).
457

458
!!! compat "Julia 1.9"
459
    The inheriting of the package environment and the `env` keyword argument were
460
    added in Julia 1.9.
461
"""
462
function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...)
92✔
463
    manager = LocalManager(np, restrict)
46✔
464
    check_addprocs_args(manager, kwargs)
46✔
465
    addprocs(manager; kwargs...)
46✔
466
end
467

468
Base.show(io::IO, manager::LocalManager) = print(io, "LocalManager()")
×
469

470
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
45✔
471
    dir = params[:dir]
45✔
472
    exename = params[:exename]
45✔
473
    exeflags = params[:exeflags]
45✔
474
    bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
45✔
475
    env = Dict{String,String}(params[:env])
88✔
476

477
    # TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function
478
    #       together with load_path() etc. Might be useful to have when spawning julia
479
    #       processes outside of Distributed.jl too.
480
    # JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup,
481
    # but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here.
482
    # Users can opt-out of this by passing `env = ...` to addprocs(...).
483
    pathsep = Sys.iswindows() ? ";" : ":"
45✔
484
    if get(env, "JULIA_LOAD_PATH", nothing) === nothing
47✔
485
        env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep)
43✔
486
    end
487
    if get(env, "JULIA_DEPOT_PATH", nothing) === nothing
47✔
488
        env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep)
43✔
489
    end
490

491
    # If we haven't explicitly asked for threaded BLAS, prevent OpenBLAS from starting
492
    # up with multiple threads, thereby sucking up a bunch of wasted memory on Windows.
493
    if !params[:enable_threaded_blas] &&
45✔
494
       get(env, "OPENBLAS_NUM_THREADS", nothing) === nothing
495
        env["OPENBLAS_NUM_THREADS"] = "1"
44✔
496
    end
497
    # Set the active project on workers using JULIA_PROJECT.
498
    # Users can opt-out of this by (i) passing `env = ...` or (ii) passing
499
    # `--project=...` as `exeflags` to addprocs(...).
500
    project = Base.ACTIVE_PROJECT[]
45✔
501
    if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing
46✔
502
        env["JULIA_PROJECT"] = project
6✔
503
    end
504

505
    for i in 1:manager.np
90✔
506
        cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
74✔
507
        io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
74✔
508
        write_cookie(io)
74✔
509

510
        wconfig = WorkerConfig()
1,480✔
511
        wconfig.process = io
74✔
512
        wconfig.io = io.out
74✔
513
        wconfig.enable_threaded_blas = params[:enable_threaded_blas]
74✔
514
        push!(launched, wconfig)
74✔
515
    end
103✔
516

517
    notify(c)
45✔
518
end
519

520
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
170✔
521
    if op === :interrupt
170✔
522
        kill(config.process, 2)
×
523
    end
524
end
525

526
"""
527
    launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
528

529
Implemented by cluster managers. For every Julia worker launched by this function, it should
530
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
531
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
532
keyword arguments [`addprocs`](@ref) was called with.
533
"""
534
launch
535

536
"""
537
    manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
538

539
Implemented by cluster managers. It is called on the master process, during a worker's
540
lifetime, with appropriate `op` values:
541

542
- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
543
- with `:interrupt` when `interrupt(workers)` is called. The `ClusterManager`
544
  should signal the appropriate worker with an interrupt signal.
545
- with `:finalize` for cleanup purposes.
546
"""
547
manage
548

549
# DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager
550

551
struct DefaultClusterManager <: ClusterManager
552
end
81✔
553

554
const tunnel_hosts_map = Dict{String, Semaphore}()
555

556
"""
557
    connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
558

559
Implemented by cluster managers using custom transports. It should establish a logical
560
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
561
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
562
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
563
ensure that messages are delivered and received completely and in order.
564
`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
565
workers.
566
"""
567
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
98✔
568
    if config.connect_at !== nothing
98✔
569
        # this is a worker-to-worker setup call.
570
        return connect_w2w(pid, config)
14✔
571
    end
572

573
    # master connecting to workers
574
    if config.io !== nothing
84✔
575
        (bind_addr, port::Int) = read_worker_host_port(config.io)
84✔
576
        pubhost = something(config.host, bind_addr)
80✔
577
        config.host = pubhost
80✔
578
        config.port = port
80✔
579
    else
580
        pubhost = notnothing(config.host)
×
581
        port = notnothing(config.port)
×
582
        bind_addr = something(config.bind_addr, pubhost)
×
583
    end
584

585
    tunnel = something(config.tunnel, false)
80✔
586

587
    s = split(pubhost,'@')
80✔
588
    user = ""
80✔
589
    if length(s) > 1
80✔
590
        user = s[1]
×
591
        pubhost = s[2]
×
592
    else
593
        if haskey(ENV, "USER")
80✔
594
            user = ENV["USER"]
73✔
595
        elseif tunnel
7✔
596
            error("USER must be specified either in the environment ",
×
597
                  "or as part of the hostname when tunnel option is used")
598
        end
599
    end
600

601
    if tunnel
80✔
602
        if !haskey(tunnel_hosts_map, pubhost)
×
603
            tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int)))
×
604
        end
605
        sem = tunnel_hosts_map[pubhost]
×
606

607
        sshflags = notnothing(config.sshflags)
×
608
        multiplex = something(config.multiplex, false)
×
609
        acquire(sem)
×
610
        try
×
611
            (s, bind_addr, forward) = connect_to_worker_with_tunnel(pubhost, bind_addr, port, user, sshflags, multiplex)
×
612
            config.forward = forward
×
613
        finally
614
            release(sem)
×
615
        end
616
    else
617
        (s, bind_addr) = connect_to_worker(bind_addr, port)
80✔
618
    end
619

620
    config.bind_addr = bind_addr
80✔
621

622
    # write out a subset of the connect_at required for further worker-worker connection setups
623
    config.connect_at = (bind_addr, port)
80✔
624

625
    if config.io !== nothing
80✔
626
        let pid = pid
80✔
627
            redirect_worker_output(pid, notnothing(config.io))
160✔
628
        end
629
    end
630

631
    (s, s)
80✔
632
end
633

634
function connect_w2w(pid::Int, config::WorkerConfig)
14✔
635
    (rhost, rport) = notnothing(config.connect_at)::Tuple{String, Int}
28✔
636
    config.host = rhost
14✔
637
    config.port = rport
14✔
638
    (s, bind_addr) = connect_to_worker(rhost, rport)
14✔
639
    (s,s)
14✔
640
end
641

642
const client_port = Ref{UInt16}(0)
643

644
function socket_reuse_port(iptype)
94✔
645
    if ccall(:jl_has_so_reuseport, Int32, ()) == 1
94✔
646
        sock = TCPSocket(delay = false)
94✔
647

648
        # Some systems (e.g. Linux) require the port to be bound before setting REUSEPORT
649
        bind_early = Sys.islinux()
94✔
650

651
        bind_early && bind_client_port(sock, iptype)
94✔
652
        rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), sock.handle)
94✔
653
        if rc < 0
94✔
654
            close(sock)
×
655

656
            # This is an issue only on systems with lots of client connections, hence delay the warning
657
            nworkers() > 128 && @warn "Error trying to reuse client port number, falling back to regular socket" maxlog=1
×
658

659
            # provide a clean new socket
660
            return TCPSocket()
×
661
        end
662
        bind_early || bind_client_port(sock, iptype)
94✔
663
        return sock
94✔
664
    else
665
        return TCPSocket()
×
666
    end
667
end
668

669
function bind_client_port(sock::TCPSocket, iptype)
94✔
670
    bind_host = iptype(0)
94✔
671
    if Sockets.bind(sock, bind_host, client_port[])
94✔
672
        _addr, port = getsockname(sock)
94✔
673
        client_port[] = port
94✔
674
    end
675
    return sock
94✔
676
end
677

678
function connect_to_worker(host::AbstractString, port::Integer)
94✔
679
    # Avoid calling getaddrinfo if possible - involves a DNS lookup
680
    # host may be a stringified ipv4 / ipv6 address or a dns name
681
    bind_addr = nothing
94✔
682
    try
94✔
683
        bind_addr = parse(IPAddr,host)
188✔
684
    catch
685
        bind_addr = getaddrinfo(host)
×
686
    end
687

688
    iptype = typeof(bind_addr)
94✔
689
    sock = socket_reuse_port(iptype)
94✔
690
    connect(sock, bind_addr, UInt16(port))
94✔
691

692
    (sock, string(bind_addr))
94✔
693
end
694

695

696
function connect_to_worker_with_tunnel(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags, multiplex)
×
697
    localport = ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags, multiplex)
×
698
    s = connect("localhost", localport)
×
699
    forward = "$localport:$bind_addr:$port"
×
700
    (s, bind_addr, forward)
×
701
end
702

703

704
function cancel_ssh_tunnel(config::WorkerConfig)
×
705
    host = notnothing(config.host)
×
706
    sshflags = notnothing(config.sshflags)
×
707
    tunnel = something(config.tunnel, false)
×
708
    multiplex = something(config.multiplex, false)
×
709
    if tunnel && multiplex
×
710
        forward = notnothing(config.forward)
×
711
        run(`ssh $sshflags -O cancel -L $forward $host`)
×
712
    end
713
end
714

715

716
"""
717
    kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
718

719
Implemented by cluster managers.
720
It is called on the master process, by [`rmprocs`](@ref).
721
It should cause the remote worker specified by `pid` to exit.
722
`kill(manager::ClusterManager.....)` executes a remote `exit()`
723
on `pid`.
724
"""
725
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
10✔
726
    remote_do(exit, pid)
17✔
727
    nothing
7✔
728
end
729

730
function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
×
731
    remote_do(exit, pid)
×
732
    cancel_ssh_tunnel(config)
×
733
    nothing
×
734
end
735

736
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
106✔
737
    # First, try sending `exit()` to the remote over the usual control channels
738
    remote_do(exit, pid)
105✔
739

740
    timer_task = @async begin
52✔
741
        sleep(exit_timeout)
52✔
742

743
        # Check to see if our child exited, and if not, send an actual kill signal
744
        if !process_exited(config.process)
52✔
745
            @warn("Failed to gracefully kill worker $(pid), sending SIGTERM")
×
746
            kill(config.process, Base.SIGTERM)
×
747

748
            sleep(term_timeout)
×
749
            if !process_exited(config.process)
×
750
                @warn("Worker $(pid) ignored SIGTERM, sending SIGKILL")
×
751
                kill(config.process, Base.SIGKILL)
×
752
            end
753
        end
754
    end
755
    errormonitor(timer_task)
52✔
756
    return nothing
52✔
757
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc