• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 14677860390

26 Apr 2025 04:47AM UTC coverage: 95.213% (-0.04%) from 95.252%
14677860390

Pull #751

github

mlin
Merge remote-tracking branch 'origin/main' into mlin/nvidia
Pull Request #751: nvidia gpu

101 of 114 new or added lines in 2 files covered. (88.6%)

5 existing lines in 1 file now uncovered.

7459 of 7834 relevant lines covered (95.21%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.48
/WDL/runtime/backend/docker_swarm.py
1
"""
2
Default TaskContainer implementation using Docker Swarm
3
"""
4

5
import os
1✔
6
import json
1✔
7
import stat
1✔
8
import time
1✔
9
import shlex
1✔
10
import uuid
1✔
11
import base64
1✔
12
import random
1✔
13
import hashlib
1✔
14
import logging
1✔
15
import threading
1✔
16
import traceback
1✔
17
import contextlib
1✔
18
from io import BytesIO
1✔
19
from typing import List, Dict, Set, Optional, Any, Callable, Iterable, Tuple
1✔
20
import docker
1✔
21
from ... import Error
1✔
22
from ..._util import chmod_R_plus, TerminationSignalFlag
1✔
23
from ..._util import StructuredLogMessage as _
1✔
24
from .. import config
1✔
25
from ..error import Interrupted, Terminated
1✔
26
from ..task_container import TaskContainer
1✔
27

28

29
class SwarmContainer(TaskContainer):
1✔
30
    """
31
    TaskContainer docker (swarm) runtime
32
    """
33

34
    _limits: Dict[str, int] = {}
1✔
35

36
    @classmethod
1✔
37
    def global_init(cls, cfg: config.Loader, logger: logging.Logger) -> None:
1✔
38
        cls._limits = _init_docker_swarm(cfg, logger)
1✔
39

40
    @classmethod
1✔
41
    def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
1✔
42
        assert cls._limits, f"{cls.__name__}.global_init"
1✔
43
        return cls._limits
1✔
44

45
    create_service_kwargs: Optional[Dict[str, Any]] = None  # DEPRECATED
1✔
46

47
    _bind_input_files: bool = True
1✔
48
    _observed_states: Optional[Set[str]] = None
1✔
49

50
    def copy_input_files(self, logger: logging.Logger) -> None:
1✔
51
        assert self._bind_input_files
1✔
52
        super().copy_input_files(logger)
1✔
53
        # now that files have been copied into the working dir, it won't be necessary to bind-mount
54
        # them individually
55
        self._bind_input_files = False
1✔
56

57
    def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: str) -> int:
1✔
58
        self._observed_states = set()
1✔
59
        with open(os.path.join(self.host_dir, "command"), "w") as outfile:
1✔
60
            outfile.write(command)
1✔
61

62
        # prepare docker configuration
63
        client = docker.from_env(version="auto", timeout=900)
1✔
64

65
        polling_period = self.cfg.get_float("docker_swarm", "polling_period_seconds")
1✔
66
        server_error_retries = self.cfg.get_int("docker_swarm", "server_error_retries")
1✔
67

68
        # run container as a transient docker swarm service, letting docker handle the resource
69
        # scheduling (e.g. waiting until requested # of CPUs are available).
70
        svc = None
1✔
71
        exit_code = None
1✔
72
        try:
1✔
73
            # create the service via low-level APIClient
74
            svc = self.create_service(logger, client)
1✔
75

76
            # stream stderr into log
77
            with contextlib.ExitStack() as cleanup:
1✔
78
                poll_stderr = cleanup.enter_context(self.poll_stderr_context(logger))
1✔
79

80
                # poll for container exit
81
                running_states = {"preparing", "running"}
1✔
82
                was_running = False
1✔
83
                server_errors = 0
1✔
84
                while exit_code is None:
1✔
85
                    # spread out work over the GIL
86
                    time.sleep(random.uniform(polling_period * 0.5, polling_period * 1.5))
1✔
87
                    if terminating():
1✔
88
                        quiet = not self._observed_states.difference(
1✔
89
                            # reduce log noise if the terminated task only sat in docker's queue
90
                            {"(UNKNOWN)", "new", "allocated", "pending"}
91
                        )
92
                        if not quiet:
1✔
93
                            self.poll_service(logger, svc, verbose=True)
1✔
94
                        raise Terminated(quiet=quiet)
1✔
95
                    try:
1✔
96
                        exit_code = self.poll_service(logger, svc)
1✔
97
                        if server_errors:
1✔
98
                            logger.error("docker service status polling succeeded after retries")
×
99
                        server_errors = 0
1✔
100
                    except docker.errors.APIError as exn:
×
101
                        logger.debug(traceback.format_exc())
×
102
                        logger.error(
×
103
                            _(
104
                                "docker service status polling error",
105
                                tries_remaining=(server_error_retries - server_errors),
106
                                exception=str(exn),
107
                            )
108
                        )
109
                        # retry dockerd errors (5xx status code)
110
                        if not exn.is_server_error() or server_errors >= server_error_retries:
×
111
                            raise
×
112
                        server_errors += 1
×
113
                    if not was_running and self._observed_states.intersection(running_states):
1✔
114
                        # indicate actual container start in status bar
115
                        # 'preparing' is when docker is pulling and extracting the image, which can
116
                        # be a lengthy and somewhat intensive operation, so we count it as running.
117
                        cleanup.enter_context(self.task_running_context())
1✔
118
                        was_running = True
1✔
119
                    if "running" in self._observed_states:
1✔
120
                        poll_stderr()
1✔
121

122
                logger.debug(
1✔
123
                    _(
124
                        "docker service logs",
125
                        stdout=list(msg.decode().rstrip() for msg in svc.logs(stdout=True)),
126
                        stderr=list(msg.decode().rstrip() for msg in svc.logs(stderr=True)),
127
                    )
128
                )
129

130
            # retrieve and check container exit status
131
            assert isinstance(exit_code, int)
1✔
132
            return exit_code
1✔
133
        finally:
134
            if svc:
1✔
135
                for attempt in range(999):
1✔
136
                    try:
1✔
137
                        svc.remove()
1✔
138
                        if attempt:
1✔
139
                            logger.error("docker service removal succeeded after retries")
×
140
                        break
1✔
141
                    except Exception as exn:
×
142
                        logger.debug(traceback.format_exc())
×
143
                        logger.error(
×
144
                            _(
145
                                "docker service removal error",
146
                                tries_remaining=(server_error_retries - attempt),
147
                                exception=str(exn),
148
                            )
149
                        )
150
                        if attempt >= server_error_retries:
×
151
                            break
×
152
                        time.sleep(polling_period)
×
153
            self.chown(
1✔
154
                logger, client, isinstance(exit_code, int) and self.success_exit_code(exit_code)
155
            )
156
            client.close()
1✔
157

158
    def create_service(
1✔
159
        self,
160
        logger: logging.Logger,
161
        client: docker.DockerClient,
162
    ) -> docker.models.services.Service:
163
        """
164
        Create a Docker Swarm service which will schedule the task container.
165

166
        We use the low-level client.api.create_service() because the higher-level Service model
167
        doesn't support GPU requests.
168
        """
169
        if self.create_service_kwargs:
1✔
170
            # plugins should be updated to subclass SwarmContainer and override
171
            # prepare_create_service() instead of the old create_service_kwargs dict.
NEW
172
            logger.warning(
×
173
                _(
174
                    "old plug-in (using deprecated SwarmContainer.create_service_kwargs)"
175
                    " may be incompatible with this miniwdl version"
176
                )
177
            )
178
        svc_kwargs = self.prepare_create_service(logger, client)
1✔
179
        logger.debug(_("docker create_service", **svc_kwargs))
1✔
180
        service_id = client.api.create_service(**svc_kwargs)["ID"]
1✔
181
        # fetch high-level Service model
182
        svc = client.services.get(service_id)
1✔
183
        logger.debug(_("docker service", name=svc.name, short_id=svc.short_id))
1✔
184
        return svc
1✔
185

186
    def prepare_create_service(
1✔
187
        self, logger: logging.Logger, client: docker.DockerClient
188
    ) -> Dict[str, Any]:
189
        """
190
        Prepare kwargs for create_service()
191

192
        Plugin subclasses might override this method (or its sub-methods) to customize.
193
        """
194

195
        task_template_args, task_template_kwargs = self.prepare_task_template(logger, client)
1✔
196
        svc_kwargs: Dict[str, Any] = {
1✔
197
            "task_template": docker.types.TaskTemplate(*task_template_args, **task_template_kwargs),
198
            "name": self.unique_service_name(self.run_id),
199
            "labels": {"miniwdl_run_id": self.run_id},
200
        }
201
        network = self.runtime_values.get("docker_network", None)
1✔
202
        if network:
1✔
203
            if network in self.cfg.get_list("docker_swarm", "allow_networks"):
1✔
204
                svc_kwargs["networks"] = [network]
1✔
205
            else:
206
                logger.warning(
1✔
207
                    _(
208
                        "runtime.docker_network ignored; network name must appear in JSON list from config"
209
                        " [docker_swarm] allow_networks / env MINIWDL__DOCKER_SWARM__ALLOW_NETWORKS",
210
                        docker_network=network,
211
                    )
212
                )
213
        return svc_kwargs
1✔
214

215
    def prepare_task_template(
1✔
216
        self, logger: logging.Logger, client: docker.DockerClient
217
    ) -> Tuple[List[Any], Dict[str, Any]]:
218
        "Prepare args & kwargs for TaskTemplate"
219

220
        container_spec_args, container_spec_kwargs = self.prepare_container_spec(logger, client)
1✔
221
        container_spec = docker.types.ContainerSpec(*container_spec_args, **container_spec_kwargs)
1✔
222
        if self.runtime_values.get("gpu", False):
1✔
223
            # docker.types.ContainerSpec doesn't have a host_config kwarg, so we set it by dict key
224
            container_spec["HostConfig"] = docker.types.HostConfig(
1✔
225
                client.api.api_version,
226
                device_requests=[
227
                    docker.types.DeviceRequest(driver="nvidia", count=-1, capabilities=[["gpu"]])
228
                ],
229
            )
230

231
        # build task template
232
        resources: Dict[str, Any] = {}
1✔
233
        cpu = self.runtime_values.get("cpu", 0)
1✔
234
        if cpu > 0:
1✔
235
            # the cpu unit expected by swarm is "NanoCPUs"
236
            resources["cpu_limit"] = cpu * 1_000_000_000
1✔
237
            resources["cpu_reservation"] = cpu * 1_000_000_000
1✔
238
        memory_reservation = self.runtime_values.get("memory_reservation", 0)
1✔
239
        if memory_reservation > 0:
1✔
240
            resources["mem_reservation"] = memory_reservation
1✔
241
        memory_limit = self.runtime_values.get("memory_limit", 0)
1✔
242
        if memory_limit > 0:
1✔
243
            resources["mem_limit"] = memory_limit
1✔
244

245
        kwargs = {
1✔
246
            "restart_policy": docker.types.RestartPolicy("none"),
247
        }
248
        if resources:
1✔
249
            logger.debug(_("docker resources", **resources))
1✔
250
            kwargs["resources"] = docker.types.Resources(**resources)
1✔
251
        return [container_spec], kwargs
1✔
252

253
    def prepare_container_spec(
1✔
254
        self, logger: logging.Logger, client: docker.DockerClient
255
    ) -> Tuple[List[Any], Dict[str, Any]]:
256
        "Prepare args & kwargs for ContainerSpec"
257

258
        if "inlineDockerfile" in self.runtime_values:
1✔
259
            image_tag = self.build_inline_dockerfile(logger.getChild("inlineDockerfile"), client)
1✔
260
        else:
261
            image_tag = self.resolve_tag(
1✔
262
                logger, client, self.runtime_values.get("docker", "ubuntu:20.04")
263
            )
264
        command = [
1✔
265
            "/bin/sh",
266
            "-c",
267
            self.cfg.get("task_runtime", "command_shell")
268
            + " ../command >> ../stdout.txt 2>> ../stderr.txt",
269
        ]
270
        env = [f"{k}={v}" for (k, v) in self.runtime_values.get("env", {}).items()]
1✔
271

272
        # build container spec
273
        container_spec_kwargs = {
1✔
274
            "command": command,
275
            "env": env,
276
            "workdir": os.path.join(self.container_dir, "work"),
277
            "mounts": self.prepare_mounts(logger),
278
            "labels": {"miniwdl_run_id": self.run_id},
279
            # add invoking user's group to ensure that command can access the mounted working
280
            # directory even if the docker image assumes some arbitrary uid
281
            "groups": [str(os.getegid())],
282
        }
283
        if container_spec_kwargs["groups"] == ["0"]:
1✔
NEW
284
            logger.warning(
×
285
                "container command will run as a root/wheel group member, since this is your primary group (gid=0)"
286
            )
287
        if self.cfg["task_runtime"].get_bool("as_user"):
1✔
288
            container_spec_kwargs["user"] = f"{os.geteuid()}:{os.getegid()}"
1✔
289
            logger.info(_("docker user", uid_gid=container_spec_kwargs["user"]))
1✔
290
            if os.geteuid() == 0:
1✔
NEW
291
                logger.warning(
×
292
                    "container command will run explicitly as root, since you are root and set --as-me"
293
                )
294
        if self.runtime_values.get("privileged", False) is True:
1✔
295
            logger.warning("runtime.privileged enabled (security & portability warning)")
1✔
296
            container_spec_kwargs["cap_add"] = ["ALL"]
1✔
297
        return [image_tag], container_spec_kwargs
1✔
298

299
    def resolve_tag(
1✔
300
        self, logger: logging.Logger, client: docker.DockerClient, image_tag: str
301
    ) -> str:
302
        if ":" not in image_tag:
1✔
303
            # seems we need to do this explicitly under some configurations -- issue #232
304
            image_tag += ":latest"
1✔
305
        # fetch image info
306
        try:
1✔
307
            image_attrs = client.images.get(image_tag).attrs
1✔
308
        except docker.errors.ImageNotFound:
1✔
309
            try:
1✔
310
                logger.info(_("docker pull", tag=image_tag))
1✔
311
                client.images.pull(image_tag)
1✔
312
                image_attrs = client.images.get(image_tag).attrs
1✔
313
            except docker.errors.ImageNotFound:
1✔
314
                raise Error.RuntimeError("docker image not found: " + image_tag) from None
1✔
315
        image_log = {"tag": image_tag, "id": image_attrs["Id"]}
1✔
316
        # resolve mutable tag to immutable RepoDigest if possible, to ensure identical image will
317
        # be used across a multi-node swarm
318
        image_digest = None
1✔
319
        if bool(image_attrs.get("RepoDigests")) and image_tag not in image_attrs["RepoDigests"]:
1✔
320
            image_digest = image_attrs["RepoDigests"][0]
1✔
321
            image_tag = image_digest
1✔
322
        image_log["RepoDigest"] = image_digest
1✔
323
        logger.notice(_("docker image", **image_log))
1✔
324
        return image_tag
1✔
325

326
    def prepare_mounts(self, logger: logging.Logger) -> List[docker.types.Mount]:
1✔
327
        def escape(s):
1✔
328
            # docker processes {{ interpolations }}
329
            return s.replace("{{", '{{"{{"}}')
1✔
330

331
        mounts = []
1✔
332
        # mount input files/directories and command
333
        if self._bind_input_files:
1✔
334
            perm_warn = True
1✔
335
            for host_path, container_path in self.input_path_map.items():
1✔
336
                st = os.stat(host_path.rstrip("/"))
1✔
337
                if perm_warn and not (
1✔
338
                    (st.st_mode & stat.S_IROTH)
339
                    or (st.st_gid == os.getegid() and (st.st_mode & stat.S_IRGRP))
340
                ):
341
                    # file is neither world-readable, nor group-readable for the invoking user's primary group
342
                    logger.warning(
1✔
343
                        _(
344
                            "one or more input file(s) could be inaccessible to docker images that don't run as root; "
345
                            "it may be necessary to `chmod g+r` them, or set --copy-input-files",
346
                            example_file=host_path,
347
                        )
348
                    )
349
                    perm_warn = False
1✔
350
                assert (not container_path.endswith("/")) or stat.S_ISDIR(st.st_mode)
1✔
351
                host_mount_point = os.path.join(
1✔
352
                    self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir)
353
                )
354
                if not os.path.exists(host_mount_point):
1✔
355
                    self.touch_mount_point(
1✔
356
                        host_mount_point + ("/" if container_path.endswith("/") else "")
357
                    )
358
                mounts.append(
1✔
359
                    docker.types.Mount(
360
                        escape(container_path.rstrip("/")),
361
                        escape(host_path.rstrip("/")),
362
                        type="bind",
363
                        read_only=True,
364
                    )
365
                )
366
        mounts.append(
1✔
367
            docker.types.Mount(
368
                escape(os.path.join(self.container_dir, "command")),
369
                escape(os.path.join(self.host_dir, "command")),
370
                type="bind",
371
                read_only=True,
372
            )
373
        )
374
        # mount stdout, stderr, and working directory read/write
375
        self.touch_mount_point(self.host_stdout_txt())
1✔
376
        mounts.append(
1✔
377
            docker.types.Mount(
378
                escape(os.path.join(self.container_dir, "stdout.txt")),
379
                escape(self.host_stdout_txt()),
380
                type="bind",
381
            )
382
        )
383
        self.touch_mount_point(self.host_stderr_txt())
1✔
384
        mounts.append(
1✔
385
            docker.types.Mount(
386
                escape(os.path.join(self.container_dir, "stderr.txt")),
387
                escape(self.host_stderr_txt()),
388
                type="bind",
389
            )
390
        )
391
        mounts.append(
1✔
392
            docker.types.Mount(
393
                escape(os.path.join(self.container_dir, "work")),
394
                escape(self.host_work_dir()),
395
                type="bind",
396
            )
397
        )
398

399
        # providing g+rw on files (and g+rwx on directories) in the task directory ensures unix
400
        # permissions won't block the command regardless of which uid it runs as in the container
401
        # (since we add the container to the invoking user's primary group)
402
        for mnt in [
1✔
403
            os.path.join(self.host_dir, "command"),
404
            self.host_stdout_txt(),
405
            self.host_stderr_txt(),
406
            self.host_work_dir(),
407
        ]:
408
            chmod_R_plus(mnt, file_bits=0o660, dir_bits=0o770)
1✔
409

410
        return mounts
1✔
411

412
    def poll_service(
1✔
413
        self, logger: logging.Logger, svc: docker.models.services.Service, verbose: bool = False
414
    ) -> Optional[int]:
415
        state = "(UNKNOWN)"
1✔
416
        status = {}
1✔
417

418
        svc.reload()
1✔
419
        assert svc.attrs["Spec"]["Labels"]["miniwdl_run_id"] == self.run_id
1✔
420
        tasks = svc.tasks()
1✔
421
        if tasks:
1✔
422
            assert len(tasks) == 1, "docker service should have at most 1 task"
1✔
423
            status = tasks[0]["Status"]
1✔
424
            if logger.isEnabledFor(logging.DEBUG):
1✔
425
                logger.debug(_("docker task status", **status))
1✔
426
            state = status["State"]
1✔
427
        else:
UNCOV
428
            assert len(self._observed_states or []) <= 1, (
×
429
                "docker task shouldn't disappear from service"
430
            )
431

432
        # references on docker task states:
433
        # https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/
434
        # https://github.com/docker/swarmkit/blob/master/design/task_model.md
435
        # https://github.com/moby/moby/blob/8fbf2598f58fb212230e6ddbcfbde628b0458250/api/types/swarm/task.go#L12
436

437
        # log each new state
438
        assert isinstance(state, str) and isinstance(self._observed_states, set)
1✔
439
        if state not in self._observed_states:
1✔
440
            loginfo = {"service": svc.short_id}
1✔
441
            if tasks:
1✔
442
                loginfo["task"] = tasks[0]["ID"][:10]
1✔
443
                if "NodeID" in tasks[0]:
1✔
444
                    loginfo["node"] = tasks[0]["NodeID"][:10]
1✔
445
            if status.get("DesiredState") not in (None, state):
1✔
UNCOV
446
                loginfo["desired"] = status["DesiredState"]
×
447
            logmsg = status.get("Err", status.get("Message", None))
1✔
448
            if logmsg and logmsg != state:
1✔
449
                loginfo["message"] = logmsg
1✔
450
            method = logger.info
1✔
451
            if state == "running":
1✔
452
                method = logger.notice
1✔
453
            elif state in ["failed", "shutdown", "rejected", "orphaned", "remove"]:
1✔
454
                method = logger.error
1✔
455
            method(_(f"docker task {state}", **loginfo))
1✔
456
            self._observed_states.add(state)
1✔
457

458
        # determine whether docker task has exited
459
        exit_code = None
1✔
460
        if "ExitCode" in status.get("ContainerStatus", {}):
1✔
461
            exit_code = status["ContainerStatus"]["ExitCode"]
1✔
462
            assert isinstance(exit_code, int)
1✔
463

464
        if state in ("complete", "failed"):
1✔
465
            msg = _("docker task exit", state=state, exit_code=exit_code)
1✔
466
            if state == "failed":
1✔
467
                logger.error(msg)
1✔
468
            else:
469
                logger.notice(msg)
1✔
470
            assert isinstance(exit_code, int) and (exit_code == 0) == (state == "complete")
1✔
471
            return exit_code
1✔
472
        elif {state, status.get("DesiredState")}.intersection(
1✔
473
            {"rejected", "shutdown", "orphaned", "remove"}
474
        ) or exit_code not in (None, 0):
475
            # "rejected" state usually arises from nonexistent docker image.
476
            # if the worker assigned a task goes down, any of the following can manifest:
477
            #   - exit_code=-1 with state running (or other non-terminal)
478
            #   - state shutdown, orphaned, remove
479
            #   - desired_state shutdown
480
            # also see GitHub issue #374
UNCOV
481
            raise (Error.RuntimeError if state == "rejected" else Interrupted)(
×
482
                f"docker task {state}"
483
                + (
484
                    (", desired state " + status["DesiredState"])
485
                    if status.get("DesiredState") not in (None, state)
486
                    else ""
487
                )
488
                + (f", exit code = {exit_code}" if exit_code not in (None, 0) else "")
489
                + (f": {status['Err']}" if "Err" in status else "")
490
            )
491

492
        return None
1✔
493

494
    def chown(self, logger: logging.Logger, client: docker.DockerClient, success: bool) -> None:
1✔
495
        """
496
        After task completion, chown all files in the working directory to the invoking user:group,
497
        instead of leaving them frequently owned by root or some other arbitrary user id (image-
498
        dependent). We do this in a funny way via Docker; see GitHub issue #271 for discussion of
499
        alternatives and their problems.
500
        """
501
        if (
1✔
502
            not self.cfg.get_bool("file_io", "chown")
503
            or self.cfg["task_runtime"].get_bool("as_user")
504
            or (os.geteuid() == 0 and os.getegid() == 0)
505
        ):
506
            return
1✔
507
        paste = shlex.quote(
1✔
508
            os.path.join(
509
                self.container_dir, f"work{self.try_counter if self.try_counter > 1 else ''}"
510
            )
511
        )
512
        script = f"""
1✔
513
        (find {paste} -type d -print0 && find {paste} -type f -print0 \
514
            && find {paste} -type l -print0) \
515
            | xargs -0 -P 10 chown -Ph {os.geteuid()}:{os.getegid()}
516
        """.strip()
517
        volumes = {self.host_dir: {"bind": self.container_dir, "mode": "rw"}}
1✔
518
        logger.debug(_("post-task chown", script=script, volumes=volumes))
1✔
519
        try:
1✔
520
            chowner = None
1✔
521
            try:
1✔
522
                chowner = client.containers.run(
1✔
523
                    "alpine:3",
524
                    name=self.unique_service_name("chown-" + self.run_id),
525
                    command=["/bin/ash", "-eo", "pipefail", "-c", script],
526
                    volumes=volumes,
527
                    detach=True,
528
                )
529
                chowner_status = chowner.wait()
1✔
530
                assert (
1✔
531
                    isinstance(chowner_status, dict) and chowner_status.get("StatusCode", -1) == 0
532
                ), (
533
                    f"post-task chown failed: {chowner_status}"
534
                    + "; try setting [file_io] chown = false"
535
                )
536
            finally:
537
                if chowner:
1✔
538
                    chowner.remove()
1✔
539
        except Exception as exn:
×
540
            logger.debug(traceback.format_exc())
×
541
            if success:
×
UNCOV
542
                raise
×
UNCOV
543
            logger.error(_("post-task chown also failed", exception=str(exn)))
×
544

545
    def unique_service_name(self, run_id: str) -> str:
1✔
546
        # We need to give each service a name unique on the swarm; collisions cause the service
547
        # create request to fail. Considerations:
548
        # 1. [0-9A-Za-z-]{1,63} -- case is remembered, but comparison ignores it.
549
        # 2. It's useful for the names to be mostly human-readable via `docker service ls` to get a
550
        #    sense of what's happening on the swarm. Unfortunately, that tool truncates the display
551
        #    names pretty short, so prefer human-readability of the leftmost part of the name.
552
        # 3. PID+seqno isn't sufficient because the swarm could receive submissions from miniwdl
553
        #    running in different hosts/VMs/containers with potentially colliding PIDs.
554
        # see GitHub issues: 327, 368
555
        junk = hashlib.sha256()
1✔
556
        junk.update(uuid.uuid1().bytes)
1✔
557
        junk.update(uuid.uuid4().bytes)
1✔
558
        junks = base64.b32encode(junk.digest()[:15]).decode().lower()
1✔
559
        assert len(junks) == 24
1✔
560
        return f"wdl-{run_id[:34]}-{junks}"  # 4 + 34 + 1 + 24 = 63
1✔
561

562
    _build_inline_dockerfile_lock: threading.Lock = threading.Lock()
1✔
563

564
    def build_inline_dockerfile(
1✔
565
        self,
566
        logger: logging.Logger,
567
        client: docker.DockerClient,
568
        tries: Optional[int] = None,
569
    ) -> str:
570
        logger.warning("runtime.inlineDockerfile is an experimental extension, subject to change")
1✔
571
        # formulate image tag using digest of dockerfile text
572
        dockerfile_utf8 = self.runtime_values["inlineDockerfile"].encode("utf8")
1✔
573
        dockerfile_digest = hashlib.sha256(dockerfile_utf8).digest()
1✔
574
        tag_part1 = "miniwdl_auto_"
1✔
575
        tag_part3 = ":" + base64.b32encode(dockerfile_digest[:15]).decode().lower()
1✔
576
        tag_part2 = self.run_id.lower()
1✔
577
        if "-" in tag_part2:
1✔
578
            tag_part2 = tag_part2.split("-")[1]
1✔
579
        maxtag2 = 63 - len(tag_part1) - len(tag_part3)
1✔
580
        assert maxtag2 > 0
1✔
581
        tag = tag_part1 + tag_part2[:maxtag2] + tag_part3
1✔
582

583
        # short-circuit if digest-tagged image already exists
584
        try:
1✔
585
            existing = client.images.get(tag)
1✔
586
            logger.notice(_("docker build cached", tag=tag, id=existing.id))
1✔
587
            return tag
1✔
588
        except docker.errors.ImageNotFound:
1✔
589
            pass
1✔
590

591
        # prepare to tee docker build log to logger.verbose and a file
592
        build_logfile = os.path.join(self.host_dir, "inlineDockerfile.log")
1✔
593

594
        def write_log(stream: Iterable[Dict[str, str]]):
1✔
595
            # tee the log messages to logger.verbose and build_logfile
596
            with open(build_logfile, "w") as outfile:
1✔
597
                for d in stream:
1✔
598
                    if "stream" in d:
1✔
599
                        for msg in d["stream"].split("\n"):
1✔
600
                            msg = msg.rstrip()
1✔
601
                            if msg:
1✔
602
                                logger.verbose(msg)
1✔
603
                                print(msg, file=outfile)
1✔
604

605
        # run docker build
606
        try:
1✔
607
            with SwarmContainer._build_inline_dockerfile_lock:  # one build at a time
1✔
608
                logger.info(_("starting docker build", tag=tag))
1✔
609
                logger.debug(_("Dockerfile", txt=self.runtime_values["inlineDockerfile"]))
1✔
610
                image, build_log = client.images.build(fileobj=BytesIO(dockerfile_utf8), tag=tag)
1✔
611
        except docker.errors.BuildError as exn:
1✔
612
            # potentially retry, if task has runtime.maxRetries
613
            if isinstance(tries, int):
1✔
614
                tries -= 1
1✔
615
            else:
616
                tries = self.runtime_values.get("maxRetries", 0)
1✔
617
            if tries > 0:
1✔
618
                logger.error(
1✔
619
                    _("failed docker build will be retried", tries_remaining=tries, msg=exn.msg)
620
                )
621
                return self.build_inline_dockerfile(logger, client, tries=tries)
1✔
622
            else:
623
                write_log(exn.build_log)
1✔
624
                logger.error(_("docker build failed", msg=exn.msg, log=build_logfile))
1✔
625
                raise exn
1✔
626

627
        write_log(build_log)
1✔
628
        logger.notice(_("docker build", tag=image.tags[0], id=image.id, log=build_logfile))
1✔
629
        return tag
1✔
630

631

632
def _init_docker_swarm(cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
1✔
633
    """
634
    Initialize Docker Swarm and detect the host's CPU and memory resources.
635
    """
636
    worker_nodes = []
1✔
637

638
    with contextlib.ExitStack() as cleanup:
1✔
639
        client = docker.from_env(version="auto")
1✔
640
        cleanup.callback(lambda: client.close())
1✔
641
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
642
        logger.debug("dockerd :: " + json.dumps(client.version())[1:-1])
1✔
643

644
        # initialize swarm
645
        state = "(unknown)"
1✔
NEW
646
        while True:
×
647
            if terminating():
1✔
NEW
648
                raise Terminated()
×
649

650
            info = client.info()
1✔
651
            if "Swarm" in info and "LocalNodeState" in info["Swarm"]:
1✔
652
                logger.debug(_("swarm info", **info["Swarm"]))
1✔
653
                state = info["Swarm"]["LocalNodeState"]
1✔
654

655
            # https://github.com/moby/moby/blob/e7b5f7dbe98c559b20c0c8c20c0b31a6b197d717/api/types/swarm/swarm.go#L185
656
            if state == "active":
1✔
657
                if info["Swarm"]["ControlAvailable"]:
1✔
658
                    worker_nodes = [
1✔
659
                        node
660
                        for node in client.nodes.list()
661
                        if node.attrs["Spec"]["Availability"] == "active"
662
                        and node.attrs["Status"]["State"] == "ready"
663
                    ]
664
                    if worker_nodes:
1✔
665
                        break
1✔
666
                else:
NEW
667
                    logger.warning(
×
668
                        "this host is a docker swarm worker but not a manager; "
669
                        "WDL task scheduling requires manager access"
670
                    )
671
            elif state == "inactive" and cfg["docker_swarm"].get_bool("auto_init"):
1✔
672
                logger.warning(
1✔
673
                    "docker swarm is inactive on this host; "
674
                    "performing `docker swarm init --advertise-addr 127.0.0.1 --listen-addr 127.0.0.1`"
675
                )
676
                try:
1✔
677
                    client.swarm.init(advertise_addr="127.0.0.1", listen_addr="127.0.0.1")
1✔
NEW
678
                except Exception as exn:
×
679
                    # smooth over race condition with multiple processes trying to init swarm
NEW
680
                    if "already part of a swarm" not in str(exn):
×
NEW
681
                        raise exn
×
682

683
            logger.notice(
1✔
684
                _(
685
                    "waiting for local docker swarm manager & worker(s)",
686
                    manager=state,
687
                    workers=len(worker_nodes),
688
                )
689
            )
690
            time.sleep(2)
1✔
691

692
        miniwdl_services = [
1✔
693
            d
694
            for d in [s.attrs for s in client.services.list()]
695
            if "Spec" in d and "Labels" in d["Spec"] and "miniwdl_run_id" in d["Spec"]["Labels"]
696
        ]
697
        if miniwdl_services and cfg["docker_swarm"].get_bool("auto_init"):
1✔
698
            logger.warning(
1✔
699
                "docker swarm lists existing miniwdl-related services. "
700
                "This is normal if other miniwdl processes are running concurrently; "
701
                "otherwise, stale state could interfere with this run. To reset it, `docker swarm leave --force`"
702
            )
703

704
    # Detect swarm's CPU & memory resources. Even on a localhost swarm, these may be less than
705
    # multiprocessing.cpu_count() and psutil.virtual_memory().total; in particular on macOS,
706
    # where Docker containers run in a virtual machine with limited resources.
707
    resources_max_mem: Dict[str, int] = {}
1✔
708
    total_NanoCPUs = 0
1✔
709
    total_MemoryBytes = 0
1✔
710

711
    for node in worker_nodes:
1✔
712
        logger.debug(
1✔
713
            _(
714
                "swarm worker",
715
                ID=node.attrs["ID"],
716
                Spec=node.attrs["Spec"],
717
                Hostname=node.attrs["Description"]["Hostname"],
718
                Resources=node.attrs["Description"]["Resources"],
719
                Status=node.attrs["Status"],
720
            )
721
        )
722
        resources = node.attrs["Description"]["Resources"]
1✔
723
        total_NanoCPUs += resources["NanoCPUs"]
1✔
724
        total_MemoryBytes += resources["MemoryBytes"]
1✔
725
        if (
1✔
726
            not resources_max_mem
727
            or resources["MemoryBytes"] > resources_max_mem["MemoryBytes"]
728
            or (
729
                resources["MemoryBytes"] == resources_max_mem["MemoryBytes"]
730
                and resources["NanoCPUs"] > resources_max_mem["NanoCPUs"]
731
            )
732
        ):
733
            resources_max_mem = resources
1✔
734

735
    max_cpu = int(resources_max_mem["NanoCPUs"] / 1_000_000_000)
1✔
736
    max_mem = resources_max_mem["MemoryBytes"]
1✔
737
    logger.notice(
1✔
738
        _(
739
            "docker swarm resources",
740
            workers=len(worker_nodes),
741
            max_cpus=max_cpu,
742
            max_mem_bytes=max_mem,
743
            total_cpus=int(total_NanoCPUs / 1_000_000_000),
744
            total_mem_bytes=total_MemoryBytes,
745
        )
746
    )
747
    return {"cpu": max_cpu, "mem_bytes": max_mem}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc