• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 14677860390

26 Apr 2025 04:47AM UTC coverage: 95.213% (-0.04%) from 95.252%
14677860390

Pull #751

github

mlin
Merge remote-tracking branch 'origin/main' into mlin/nvidia
Pull Request #751: nvidia gpu

101 of 114 new or added lines in 2 files covered. (88.6%)

5 existing lines in 1 file now uncovered.

7459 of 7834 relevant lines covered (95.21%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/WDL/runtime/task_container.py
1
"""
2
Abstract interface for task container runtime
3
"""
4

5
import os
1✔
6
import logging
1✔
7
import shutil
1✔
8
import threading
1✔
9
import typing
1✔
10
import math
1✔
11
from typing import Callable, Iterable, Any, Dict, Optional, ContextManager, Set
1✔
12
from abc import ABC, abstractmethod
1✔
13
from contextlib import suppress
1✔
14
from .. import Error, Value, Type
1✔
15
from .._util import (
1✔
16
    TerminationSignalFlag,
17
    path_really_within,
18
    rmtree_atomic,
19
    PygtailLogger,
20
    parse_byte_size,
21
)
22
from .._util import StructuredLogMessage as _
1✔
23
from . import config, _statusbar
1✔
24
from .error import OutputError, Terminated, CommandFailed
1✔
25

26

27
class TaskContainer(ABC):
1✔
28
    """
29
    Base class for task containers, subclassed by runtime-specific backends (e.g. Docker).
30
    """
31

32
    # class stuff
33

34
    @classmethod
1✔
35
    def global_init(cls, cfg: config.Loader, logger: logging.Logger) -> None:
1✔
36
        """
37
        Perform any necessary one-time initialization of the underlying container backend. To be
38
        invoked once per process prior to any instantiation of the class.
39
        """
40
        raise NotImplementedError()
×
41

42
    @classmethod
1✔
43
    def detect_resource_limits(cls, cfg: config.Loader, logger: logging.Logger) -> Dict[str, int]:
1✔
44
        """
45
        Detect the maximum resources ("cpu" and "mem_bytes") that the underlying container backend
46
        will be able to provision for any one task.
47

48
        If determining this is at all costly, then backend should memoize (thread-safely and
49
        perhaps front-loaded in global_init).
50
        """
51
        raise NotImplementedError()
×
52

53
    # instance stuff
54

55
    run_id: str
1✔
56

57
    host_dir: str
1✔
58
    """
×
59
    :type: str
60

61
    The run directory (on the host)
62
    """
63

64
    container_dir: str
1✔
65
    """
×
66
    :type: str
67

68
    The scratch directory inside the container. The task command's working directory will be
69
    ``{container_dir}/work/``.
70
    """
71

72
    input_path_map: Dict[str, str]
1✔
73
    """
×
74
    :type: Dict[str,str]
75

76
    A mapping of host input file/directory paths to in-container mounted paths, maintained by
77
    ``add_paths``. Directory paths are distinguished by trailing slashes on both keys and values;
78
    the slashes often should be trimmed for use elsewhere.
79
    """
80

81
    input_path_map_rev: Dict[str, str]
1✔
82
    """
×
83
    Inverse of ``input_path_map`` (also maintained by ``add_paths``)
84
    """
85

86
    try_counter: int
1✔
87
    """
×
88
    :type: int
89

90
    Counter for number of retries; starts at 1 on the first attempt. On subsequent attempts, the
91
    names (on the host) of the working directory, stdout.txt, and stderr.txt may incorporate the
92
    count, to ensure their uniqueness.
93
    """
94

95
    runtime_values: Dict[str, Any]
1✔
96
    """
×
97
    Evaluted task runtime{} section, to be populated by process_runtime(). Typically the
98
    TaskContainer backend needs to honor cpu, memory_limit, memory_reservation, docker, env.
99
    Retry logic (maxRetries, preemptible) is handled externally.
100
    """
101

102
    stderr_callback: Optional[Callable[[str], None]]
1✔
103
    """
×
104
    A function called line-by-line for the task's standard error stream, iff verbose logging is
105
    enabled. If provided by a plugin then it overrides the default standard error logging, which
106
    writes each line to the 'stderr' child of the task logger.
107
    """
108

109
    failure_info: Optional[Dict[str, Any]]
1✔
110
    """
×
111
    Upon run failure, the implementation may provide additional structured information about what
112
    went wrong (beyond the exit code and log messages).
113
    """
114

115
    _running: bool
1✔
116

117
    def __init__(self, cfg: config.Loader, run_id: str, host_dir: str) -> None:
1✔
118
        self.cfg = cfg
1✔
119
        self.run_id = run_id
1✔
120
        self.host_dir = host_dir
1✔
121
        self.container_dir = "/mnt/miniwdl_task_container"
1✔
122
        self.input_path_map = {}
1✔
123
        self.input_path_map_rev = {}
1✔
124
        self.stderr_callback = None
1✔
125
        self.try_counter = 1
1✔
126
        self._running = False
1✔
127
        self.runtime_values = {}
1✔
128
        self.failure_info = None
1✔
129
        os.makedirs(self.host_work_dir())
1✔
130

131
    def add_paths(self, host_paths: Iterable[str]) -> None:
1✔
132
        """
133
        Use before running the container to add a list of host paths to mount inside the container
134
        as inputs. Directory paths should have a trailing slash. The host-to-container path mapping
135
        is maintained in ``input_path_map``.
136

137
        Although ``add_paths`` can be used multiple times, paths should be added together where
138
        possible, as this allows heuristics for dealing with any name collisions among them.
139
        """
140
        assert not self._running
1✔
141

142
        # partition the files by host directory
143
        host_paths_by_dir: Dict[str, Set[str]] = {}
1✔
144
        for host_path in host_paths:
1✔
145
            host_path_strip = host_path.rstrip("/")
1✔
146
            if host_path not in self.input_path_map and host_path_strip not in self.input_path_map:
1✔
147
                if not os.path.exists(host_path_strip):
1✔
148
                    raise Error.InputError("input path not found: " + host_path)
1✔
149
                host_paths_by_dir.setdefault(os.path.dirname(host_path_strip), set()).add(host_path)
1✔
150

151
        # for each such partition of files
152
        # - if there are no basename collisions under input subdirectory 0, then mount them there.
153
        # - otherwise, mount them in a fresh subdirectory
154
        for paths in host_paths_by_dir.values():
1✔
155
            based = os.path.join(self.container_dir, "work/_miniwdl_inputs")
1✔
156
            subd = "0"
1✔
157
            for host_path in paths:
1✔
158
                container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/")))
1✔
159
                if host_path.endswith("/"):
1✔
160
                    container_path += "/"
1✔
161
                if container_path in self.input_path_map_rev:
1✔
162
                    assert subd == "0"
1✔
163
                    subd = str(len(self.input_path_map) + 1)
1✔
164
            for host_path in paths:
1✔
165
                container_path = os.path.join(based, subd, os.path.basename(host_path.rstrip("/")))
1✔
166
                if host_path.endswith("/"):
1✔
167
                    container_path += "/"
1✔
168
                assert container_path not in self.input_path_map_rev
1✔
169
                self.input_path_map[host_path] = container_path
1✔
170
                self.input_path_map_rev[container_path] = host_path
1✔
171

172
    def copy_input_files(self, logger: logging.Logger) -> None:
1✔
173
        # After add_paths has been used as needed, copy the input files from their original
174
        # locations to the appropriate subdirectories of the container working directory. This may
175
        # not be necessary e.g. if the container backend supports bind-mounting the input
176
        # files from their original host paths.
177
        # called once per task run (attempt)
178
        for host_path, container_path in self.input_path_map.items():
1✔
179
            assert container_path.startswith(self.container_dir)
1✔
180
            host_copy_path = os.path.join(
1✔
181
                self.host_dir, os.path.relpath(container_path.rstrip("/"), self.container_dir)
182
            )
183

184
            logger.info(_("copy host input file", input=host_path, copy=host_copy_path))
1✔
185
            os.makedirs(os.path.dirname(host_copy_path), exist_ok=True)
1✔
186
            if host_path.endswith("/"):
1✔
187
                shutil.copytree(host_path.rstrip("/"), host_copy_path, symlinks=False)
1✔
188
            else:
189
                shutil.copy(host_path, host_copy_path)
1✔
190

191
    def process_runtime(self, logger: logging.Logger, runtime_eval: Dict[str, Value.Base]) -> None:
1✔
192
        """
193
        Given the evaluated WDL expressions from the task runtime{} section, populate
194
        self.runtime_values with validated/postprocessed values that will be needed to configure
195
        the container properly.
196

197
        Subclasses may override this to process custom runtime entries (before or after invoking
198
        this base version).
199
        """
200

201
        ans = self.runtime_values
1✔
202

203
        if "inlineDockerfile" in runtime_eval:
1✔
204
            # join Array[String]
205
            dockerfile = runtime_eval["inlineDockerfile"]
1✔
206
            if not isinstance(dockerfile, Value.Array):
1✔
207
                dockerfile = Value.Array(dockerfile.type, [dockerfile])
1✔
208
            ans["inlineDockerfile"] = "\n".join(
1✔
209
                elt.coerce(Type.String()).value for elt in dockerfile.value
210
            )
211
        elif "docker" in runtime_eval or "container" in runtime_eval:
1✔
212
            docker_value = runtime_eval["container" if "container" in runtime_eval else "docker"]
1✔
213
            if isinstance(docker_value, Value.Array) and len(docker_value.value):
1✔
214
                # TODO: choose a preferred candidate
215
                docker_value = docker_value.value[0]
1✔
216
            ans["docker"] = docker_value.coerce(Type.String()).value
1✔
217
        if "docker_network" in runtime_eval:
1✔
218
            network_value = runtime_eval["docker_network"]
1✔
219
            ans["docker_network"] = network_value.coerce(Type.String()).value
1✔
220

221
        if (
1✔
222
            isinstance(runtime_eval.get("privileged", None), Value.Boolean)
223
            and runtime_eval["privileged"].value is True
224
        ):
225
            if self.cfg.get_bool("task_runtime", "allow_privileged"):
1✔
226
                ans["privileged"] = True
1✔
227
            else:
228
                logger.warning(
1✔
229
                    "runtime.privileged ignored; to enable, set configuration"
230
                    " [task_runtime] allow_privileged = true (security+portability warning)"
231
                )
232

233
        host_limits = self.detect_resource_limits(self.cfg, logger)
1✔
234
        if "cpu" in runtime_eval:
1✔
235
            cpu_value = math.ceil(runtime_eval["cpu"].coerce(Type.Float()).value)
1✔
236
            cpu_max = self.cfg["task_runtime"].get_int("cpu_max")
1✔
237
            if cpu_max == 0:
1✔
238
                cpu_max = host_limits["cpu"]
1✔
239
            cpu = max(1, cpu_value if cpu_value <= cpu_max or cpu_max < 0 else cpu_max)
1✔
240
            if cpu != cpu_value:
1✔
241
                logger.warning(
1✔
242
                    _("runtime.cpu adjusted to host limit", original=cpu_value, adjusted=cpu)
243
                )
244
            ans["cpu"] = cpu
1✔
245

246
        if "memory" in runtime_eval:
1✔
247
            memory_str = runtime_eval["memory"].coerce(Type.String()).value
1✔
248
            assert isinstance(memory_str, str)
1✔
249
            try:
1✔
250
                memory_bytes = parse_byte_size(memory_str)
1✔
251
            except ValueError:
1✔
252
                raise Error.RuntimeError("invalid setting of runtime.memory, " + memory_str)
1✔
253

254
            memory_max_str = self.cfg["task_runtime"]["memory_max"].strip()
1✔
255
            memory_max = -1 if memory_max_str == "-1" else parse_byte_size(memory_max_str)
1✔
256
            if memory_max == 0:
1✔
257
                memory_max = host_limits["mem_bytes"]
1✔
258
            if memory_max > 0 and memory_bytes > memory_max:
1✔
259
                logger.warning(
1✔
260
                    _(
261
                        "runtime.memory adjusted to host limit",
262
                        original=memory_bytes,
263
                        adjusted=memory_max,
264
                    )
265
                )
266
                memory_bytes = memory_max
1✔
267
            ans["memory_reservation"] = memory_bytes
1✔
268

269
            memory_limit_multiplier = self.cfg["task_runtime"].get_float("memory_limit_multiplier")
1✔
270
            if memory_limit_multiplier > 0.0:
1✔
271
                ans["memory_limit"] = int(memory_limit_multiplier * memory_bytes)
1✔
272

273
        if "maxRetries" in runtime_eval:
1✔
274
            ans["maxRetries"] = max(0, runtime_eval["maxRetries"].coerce(Type.Int()).value)
1✔
275
        if "preemptible" in runtime_eval:
1✔
276
            ans["preemptible"] = max(0, runtime_eval["preemptible"].coerce(Type.Int()).value)
1✔
277
        if "returnCodes" in runtime_eval:
1✔
278
            rcv = runtime_eval["returnCodes"]
1✔
279
            if isinstance(rcv, Value.String) and rcv.value == "*":
1✔
280
                ans["returnCodes"] = "*"
1✔
281
            elif isinstance(rcv, Value.Int):
1✔
282
                ans["returnCodes"] = rcv.value
1✔
283
            elif isinstance(rcv, Value.Array):
1✔
284
                try:
1✔
285
                    ans["returnCodes"] = [v.coerce(Type.Int()).value for v in rcv.value]
1✔
286
                except Exception:
×
287
                    pass
×
288
            if "returnCodes" not in ans:
1✔
289
                raise Error.RuntimeError("invalid setting of runtime.returnCodes")
×
290

291
        if "gpu" in runtime_eval:
1✔
292
            if not isinstance(runtime_eval["gpu"], Value.Boolean):
1✔
293
                raise Error.RuntimeError("invalid setting of runtime.gpu")
×
294
            ans["gpu"] = runtime_eval["gpu"].value
1✔
295

296
        if "acceleratorCount" in runtime_eval:
1✔
297
            # HealthOmics-style acceleratorCount:1 to gpu:true (FIXME for proper multi-GPU support)
NEW
298
            if not isinstance(runtime_eval["acceleratorCount"], Value.Int):
×
NEW
299
                raise Error.RuntimeError("invalid setting of runtime.acceleratorCount")
×
NEW
300
            if runtime_eval["acceleratorCount"].coerce(Type.Int()).value > 0:
×
NEW
301
                ans["gpu"] = True
×
302

303
    def run(self, logger: logging.Logger, command: str) -> None:
1✔
304
        """
305
        1. Container is instantiated with the configured mounts and resources
306
        2. The mounted directory and all subdirectories have u+rwx,g+rwx permission bits; all files
307
           within have u+rw,g+rw permission bits.
308
        3. Command is executed in host_work_dir() which is mounted to {container_dir}/work inside
309
           the container.
310
        4. Standard output is written to host_stdout_txt()
311
        5. Standard error is written to host_stderr_txt() and logged at VERBOSE level
312
        6. Raises CommandFailed for nonzero exit code
313
        7. Raises Terminated if TerminationSignalFlag detected, or Interrupted if the backend
314
           cancels on us for some reason that isn't our fault.
315

316
        The container is torn down in any case, including SIGTERM/SIGHUP signal which is trapped.
317
        """
318
        # container-specific logic should be in _run(). this wrapper traps signals
319

320
        assert not self._running
1✔
321
        if command.strip():  # if the command is empty then don't bother with any of this
1✔
322
            preamble = self.cfg.get("task_runtime", "command_preamble")
1✔
323
            if preamble.strip():
1✔
324
                command = preamble + "\n" + command
×
325
            with TerminationSignalFlag(logger) as terminating:
1✔
326
                if terminating():
1✔
327
                    raise Terminated(quiet=True)
×
328
                self._running = True
1✔
329
                try:
1✔
330
                    exit_code = self._run(logger, terminating, command)
1✔
331
                finally:
332
                    self._running = False
1✔
333

334
                if not self.success_exit_code(exit_code):
1✔
335
                    raise (
1✔
336
                        CommandFailed(
337
                            exit_code,
338
                            self.host_stderr_txt(),
339
                            self.host_stdout_txt(),
340
                            more_info=self.failure_info,
341
                        )
342
                        if not terminating()
343
                        else Terminated()
344
                    )
345

346
    @abstractmethod
1✔
347
    def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: str) -> int:
1✔
348
        """
349
        Implementation-specific: run command in container & return exit status.
350

351
        Take care to write informative log messages for any backend-specific errors. Miniwdl's
352
        outer exception handler will only emit a brief, generic log message about the run failing.
353
        """
354
        # run command in container & return exit status
355
        raise NotImplementedError()
×
356

357
    def success_exit_code(self, exit_code: int) -> bool:
1✔
358
        if "returnCodes" not in self.runtime_values:
1✔
359
            return exit_code == 0
1✔
360
        rcv = self.runtime_values["returnCodes"]
1✔
361
        if isinstance(rcv, str) and rcv == "*":
1✔
362
            return True
1✔
363
        return exit_code in (rcv if isinstance(rcv, list) else [rcv])
1✔
364

365
    def delete_work(self, logger: logging.Logger, delete_streams: bool = False) -> None:
1✔
366
        """
367
        After the container exits, delete all filesystem traces of it except for task.log. That
368
        includes successful output files!
369

370
        delete_streams: if True, delete stdout.txt and stderr.txt as well
371
        """
372
        to_delete = [self.host_work_dir(), os.path.join(self.host_dir, "write_")]
1✔
373
        to_delete.append(os.path.join(self.host_dir, "command"))
1✔
374
        if delete_streams:
1✔
375
            to_delete.append(self.host_stdout_txt())
1✔
376
            to_delete.append(self.host_stderr_txt())
1✔
377
            to_delete.append(self.host_stderr_txt() + ".offset")
1✔
378
        deleted = []
1✔
379
        for p in to_delete:
1✔
380
            if os.path.isdir(p):
1✔
381
                rmtree_atomic(p)
1✔
382
                deleted.append(p)
1✔
383
            elif os.path.isfile(p):
1✔
384
                with suppress(FileNotFoundError):
1✔
385
                    os.unlink(p)
1✔
386
                deleted.append(p)
1✔
387
        if deleted:
1✔
388
            logger.info(_("deleted task work artifacts", artifacts=deleted))
1✔
389

390
    def reset(self, logger: logging.Logger) -> None:
1✔
391
        """
392
        After a container/command failure, reset the working directory state so that
393
        copy_input_files() and run() can be retried.
394
        """
395
        self.try_counter += 1
1✔
396
        os.makedirs(self.host_work_dir())
1✔
397

398
    def host_path(self, container_path: str, inputs_only: bool = False) -> Optional[str]:
1✔
399
        """
400
        Map the in-container path of an output File/Directory to a host path under ``host_dir``.
401
        Directory paths should be given a trailing "/". Return None if the path does not exist.
402

403
        SECURITY: except for inputs, this method must only return host paths under ``host_dir``
404
        and prevent any reference to other host files (e.g. /etc/passwd), including via symlinks.
405
        """
406
        if os.path.isabs(container_path):
1✔
407
            # handle output of std{out,err}.txt
408
            if container_path == os.path.join(self.container_dir, "stdout.txt"):
1✔
409
                return self.host_stdout_txt()
1✔
410
            if container_path == os.path.join(self.container_dir, "stderr.txt"):
1✔
411
                return self.host_stderr_txt()
1✔
412
            # handle output of an input File or Directory
413
            if container_path in self.input_path_map_rev:
1✔
414
                return self.input_path_map_rev[container_path]
1✔
415
            # handle output of a File or subDirectory found within an input Directory
416
            container_path_components = container_path.strip("/").split("/")
1✔
417
            for i in range(len(container_path_components) - 1, 5, -1):
1✔
418
                # 5 == len(['mnt', 'miniwdl_task_container', 'work', '_miniwdl_inputs', '0'])
419
                container_path_prefix = "/" + "/".join(container_path_components[:i]) + "/"
1✔
420
                if container_path_prefix in self.input_path_map_rev:
1✔
421
                    ans = self.input_path_map_rev[container_path_prefix]
1✔
422
                    ans += "/".join(container_path_components[i:])
1✔
423
                    if container_path.endswith("/"):
1✔
424
                        ans += "/"
1✔
425
                    assert path_really_within(ans, self.input_path_map_rev[container_path_prefix])
1✔
426
                    return ans
1✔
427
            if inputs_only:
1✔
428
                raise Error.InputError(
1✔
429
                    "task inputs attempted to use a non-input or non-existent path "
430
                    + container_path
431
                )
432
            # relativize the path to the provisioned working directory
433
            container_relpath = os.path.relpath(
1✔
434
                container_path, os.path.join(self.container_dir, "work")
435
            )
436
            if container_path.endswith("/") and not container_relpath.endswith("/"):
1✔
437
                container_relpath += "/"
1✔
438
            if container_relpath.startswith("../"):
1✔
439
                # see issue #214
440
                raise OutputError(
1✔
441
                    "task outputs attempted to use a path outside its working directory: "
442
                    + container_path
443
                )
444
            container_path = container_relpath
1✔
445

446
        ans = os.path.join(self.host_work_dir(), container_path)
1✔
447
        if container_path.endswith("/") and not ans.endswith("/"):
1✔
448
            ans += "/"
×
449
        if not (
1✔
450
            (container_path.endswith("/") and os.path.isdir(ans))
451
            or (not container_path.endswith("/") and os.path.isfile(ans))
452
        ):
453
            return None
1✔
454
        if not path_really_within(ans, self.host_work_dir()):
1✔
455
            # fail-safe guard against some weird symlink to host file
456
            raise OutputError(
1✔
457
                "task outputs attempted to use a path outside its working directory: "
458
                + container_path
459
            )
460
        if (
1✔
461
            ans.endswith("/")
462
            and self.input_path_map
463
            and (
464
                path_really_within(self.host_work_dir(), ans[:-1])
465
                or path_really_within(
466
                    ans[:-1], os.path.join(self.host_work_dir(), "_miniwdl_inputs")
467
                )
468
            )
469
        ):
470
            # prevent output of an input mount point
471
            raise OutputError("unusable output directory: " + container_path)
1✔
472
        return ans
1✔
473

474
    def host_work_dir(self):
1✔
475
        return os.path.join(
1✔
476
            self.host_dir, f"work{self.try_counter if self.try_counter > 1 else ''}"
477
        )
478

479
    def host_stdout_txt(self):
1✔
480
        return os.path.join(
1✔
481
            self.host_dir, f"stdout{self.try_counter if self.try_counter > 1 else ''}.txt"
482
        )
483

484
    def host_stderr_txt(self):
1✔
485
        return os.path.join(
1✔
486
            self.host_dir, f"stderr{self.try_counter if self.try_counter > 1 else ''}.txt"
487
        )
488

489
    def touch_mount_point(self, host_path: str) -> None:
1✔
490
        """
491
        Implementation helper: touch a File or Directory mount point that might not already exist
492
        in the host directory. This ensures ownership by the invoking user:group.
493
        """
494
        assert host_path.startswith(self.host_dir + "/")
1✔
495
        if host_path.endswith("/"):  # Directory mount point
1✔
496
            os.makedirs(host_path, exist_ok=True)
1✔
497
        else:  # File mount point
498
            os.makedirs(os.path.dirname(host_path), exist_ok=True)
1✔
499
            with open(host_path, "x") as _:
1✔
500
                pass
1✔
501

502
    def poll_stderr_context(self, logger: logging.Logger) -> ContextManager[Callable[[], None]]:
1✔
503
        """
504
        Implementation helper: open a context yielding a function to poll stderr.txt and log each
505
        each line (to either logger or self.stderr_callback if set). _run() implementation should
506
        call the function periodically while container is running, and close the context once
507
        done/failed.
508
        """
509
        return PygtailLogger(
1✔
510
            logger,
511
            self.host_stderr_txt(),
512
            callback=self.stderr_callback,
513
        )
514

515
    def task_running_context(self) -> ContextManager[None]:
1✔
516
        """
517
        Implementation helper: open a context which counts the task, and its CPU and memory
518
        reservations, in the CLI status bar's "running" ticker. _run() implementation should open
519
        this context once the container is truly running (not while e.g. still queued), and close
520
        it once done/failed.
521
        """
522
        return _statusbar.task_running(
1✔
523
            self.runtime_values.get("cpu", 0),
524
            self.runtime_values.get("memory_reservation", 0),
525
        )
526

527

528
_backends: Dict[str, typing.Type[TaskContainer]] = dict()
1✔
529
_backends_lock: threading.Lock = threading.Lock()
1✔
530

531

532
def new(cfg: config.Loader, logger: logging.Logger, run_id: str, host_dir: str) -> TaskContainer:
1✔
533
    """
534
    Instantiate a TaskContainer from the configured backend, including any necessary global
535
    initialization.
536
    """
537
    global _backends
538
    with _backends_lock:
1✔
539
        if not _backends:
1✔
540
            for plugin_name, plugin_cls in config.load_plugins(cfg, "container_backend"):
1✔
541
                _backends[plugin_name] = plugin_cls  # type: ignore
1✔
542
        backend_cls = _backends[cfg["scheduler"]["container_backend"]]
1✔
543
        if not getattr(backend_cls, "_global_init", False):
1✔
544
            backend_cls.global_init(cfg, logger)
1✔
545
            setattr(backend_cls, "_global_init", True)
1✔
546
        ans = backend_cls(cfg, run_id, host_dir)
1✔
547
        assert isinstance(ans, TaskContainer)
1✔
548
        return ans
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc