• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 18765653123

24 Oct 2025 12:20AM UTC coverage: 95.188% (-0.05%) from 95.241%
18765653123

Pull #804

github

web-flow
Merge fbec52853 into ea178fe90
Pull Request #804: Enable parsing struct meta sections

27 of 30 new or added lines in 3 files covered. (90.0%)

2 existing lines in 2 files now uncovered.

7478 of 7856 relevant lines covered (95.19%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.62
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import Tuple, List, Dict, Optional, Callable, Set, Any, Union, TYPE_CHECKING
1✔
15
from contextlib import ExitStack, suppress
1✔
16
from collections import Counter
1✔
17

18
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
19
from .._util import (
1✔
20
    write_atomic,
21
    write_values_json,
22
    provision_run_dir,
23
    TerminationSignalFlag,
24
    chmod_R_plus,
25
    path_really_within,
26
    LoggingFileHandler,
27
    compose_coroutines,
28
    pathsize,
29
    link_force,
30
    symlink_force,
31
    rmtree_atomic,
32
)
33
from .._util import StructuredLogMessage as _
1✔
34
from . import config, _statusbar
1✔
35
from .download import able as downloadable, run_cached as download
1✔
36
from .cache import CallCache, new as new_call_cache
1✔
37
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
38

39
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
40
    from .task_container import TaskContainer
×
41

42

43
def run_local_task(  # type: ignore[return]
1✔
44
    cfg: config.Loader,
45
    task: Tree.Task,
46
    inputs: Env.Bindings[Value.Base],
47
    run_id: Optional[str] = None,
48
    run_dir: Optional[str] = None,
49
    logger_prefix: Optional[List[str]] = None,
50
    _run_id_stack: Optional[List[str]] = None,
51
    _cache: Optional[CallCache] = None,
52
    _plugins: Optional[List[Callable[..., Any]]] = None,
53
) -> Tuple[str, Env.Bindings[Value.Base]]:
54
    """
55
    Run a task locally.
56

57
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
58
    paths that can be mounted into a container.
59

60
    :param run_id: unique ID for the run, defaults to workflow name
61
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
62
                    (defaults to current working directory).
63
                    If the final path component is ".", then operate in run_dir directly.
64
    """
65
    from .task_container import new as new_task_container  # delay heavy import
1✔
66

67
    _run_id_stack = _run_id_stack or []
1✔
68
    run_id = run_id or task.name
1✔
69
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
70
    logger = logging.getLogger(".".join(logger_prefix))
1✔
71
    with ExitStack() as cleanup:
1✔
72
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
73
        if terminating():
1✔
74
            raise Terminated(quiet=True)
×
75

76
        # provision run directory and log file
77
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
78
        logfile = os.path.join(run_dir, "task.log")
1✔
79
        cleanup.enter_context(
1✔
80
            LoggingFileHandler(
81
                logger,
82
                logfile,
83
            )
84
        )
85
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
86
            cleanup.enter_context(
1✔
87
                LoggingFileHandler(
88
                    logger,
89
                    logfile + ".json",
90
                    json=True,
91
                )
92
            )
93
        logger.notice(
1✔
94
            _(
95
                "task setup",
96
                name=task.name,
97
                source=task.pos.uri,
98
                line=task.pos.line,
99
                column=task.pos.column,
100
                dir=run_dir,
101
                thread=threading.get_ident(),
102
            )
103
        )
104
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
105

106
        if not _run_id_stack:
1✔
107
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
108
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
109
        else:
110
            cache = _cache
1✔
111
        assert cache
1✔
112

113
        cleanup.enter_context(_statusbar.task_slotted())
1✔
114
        maybe_container = None
1✔
115
        try:
1✔
116
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
117
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
118
            if cached is not None:
1✔
119
                for decl in task.outputs:
1✔
120
                    v = cached[decl.name]
1✔
121
                    vj = json.dumps(v.json)
1✔
122
                    logger.info(
1✔
123
                        _(
124
                            "cached output",
125
                            name=decl.name,
126
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
127
                        )
128
                    )
129
                # create out/ and outputs.json
130
                _outputs = link_outputs(
1✔
131
                    cache,
132
                    cached,
133
                    run_dir,
134
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
135
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
136
                )
137
                write_values_json(
1✔
138
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
139
                )
140
                logger.notice("done (cached)")
1✔
141
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
142
                # cached downstream inputs
143
                return (run_dir, cached)
1✔
144
            # start plugin coroutines and process inputs through them
145
            with compose_coroutines(
1✔
146
                [
147
                    (
148
                        lambda kwargs, cor=cor: cor(  # type: ignore
149
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
150
                        )
151
                    )
152
                    for cor in (
153
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
154
                        + (_plugins or [])
155
                    )
156
                ],
157
                {"inputs": inputs},
158
            ) as plugins:
159
                recv = next(plugins)
1✔
160
                inputs = recv["inputs"]
1✔
161

162
                # download input files, if needed
163
                posix_inputs = _download_input_files(
1✔
164
                    cfg,
165
                    logger,
166
                    logger_prefix,
167
                    run_dir,
168
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
169
                    cache,
170
                )
171

172
                # create TaskContainer according to configuration
173
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
174
                maybe_container = container
1✔
175

176
                # evaluate input/postinput declarations, including mapping from host to
177
                # in-container file paths
178
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
179

180
                # evaluate runtime fields
181
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
182
                _eval_task_runtime(
1✔
183
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
184
                )
185

186
                # interpolate command
187
                old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
188
                # pylint: disable=E1101
189
                placeholder_re = regex.compile(
1✔
190
                    cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX
191
                )
192
                setattr(
1✔
193
                    stdlib,
194
                    "_placeholder_regex",
195
                    placeholder_re,
196
                )  # hack to pass regex to WDL.Expr.Placeholder._eval
197
                assert isinstance(task.command, Expr.TaskCommand)
1✔
198
                command = task.command.eval(
1✔
199
                    container_env, stdlib, dedent=not old_command_dedent
200
                ).value
201
                delattr(stdlib, "_placeholder_regex")
1✔
202
                if old_command_dedent:  # see issue #674
1✔
203
                    command = _util.strip_leading_whitespace(command)[1]
1✔
204
                logger.debug(_("command", command=command.strip()))
1✔
205

206
                # process command & container through plugins
207
                recv = plugins.send({"command": command, "container": container})
1✔
208
                command, container = (recv[k] for k in ("command", "container"))
1✔
209

210
                # start container & run command (and retry if needed)
211
                _try_task(cfg, task, logger, container, command, terminating)
1✔
212

213
                # evaluate output declarations
214
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
215

216
                # create output_links
217
                outputs = link_outputs(
1✔
218
                    cache,
219
                    outputs,
220
                    run_dir,
221
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
222
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
223
                )
224

225
                # process outputs through plugins
226
                recv = plugins.send({"outputs": outputs})
1✔
227
                outputs = recv["outputs"]
1✔
228

229
                # clean up, if so configured, and make sure output files will be accessible to
230
                # downstream tasks
231
                _delete_work(cfg, logger, container, True)
1✔
232
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
233
                _warn_output_basename_collisions(logger, outputs)
1✔
234

235
                # write outputs.json
236
                write_values_json(
1✔
237
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
238
                )
239
                logger.notice("done")
1✔
240
                if not run_id.startswith("download-"):
1✔
241
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
242
                return (run_dir, outputs)
1✔
243
        except Exception as exn:
1✔
244
            tbtxt = traceback.format_exc()
1✔
245
            logger.debug(tbtxt)
1✔
246
            wrapper = RunFailed(task, run_id, run_dir)
1✔
247
            logmsg = _(
1✔
248
                str(wrapper),
249
                dir=run_dir,
250
                **error_json(
251
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
252
                ),
253
            )
254
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
UNCOV
255
                logger.debug(logmsg)
×
256
            else:
257
                logger.error(logmsg)
1✔
258
            try:
1✔
259
                write_atomic(
1✔
260
                    json.dumps(
261
                        error_json(
262
                            wrapper,
263
                            cause=exn,
264
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
265
                        ),
266
                        indent=2,
267
                    ),
268
                    os.path.join(run_dir, "error.json"),
269
                )
270
            except Exception as exn2:
×
271
                logger.debug(traceback.format_exc())
×
272
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
273
            try:
1✔
274
                if maybe_container:
1✔
275
                    _delete_work(cfg, logger, maybe_container, False)
1✔
276
            except Exception as exn2:
×
277
                logger.debug(traceback.format_exc())
×
278
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
279
            raise wrapper from exn
1✔
280

281

282
def _download_input_files(
1✔
283
    cfg: config.Loader,
284
    logger: logging.Logger,
285
    logger_prefix: List[str],
286
    run_dir: str,
287
    inputs: Env.Bindings[Value.Base],
288
    cache: CallCache,
289
) -> Env.Bindings[Value.Base]:
290
    """
291
    Find all File & Directory input values that are downloadable URIs (including any nested within
292
    compound values). Download them to some location under run_dir and return a copy of the inputs
293
    with the URI values replaced by the downloaded paths.
294
    """
295

296
    downloads = 0
1✔
297
    download_bytes = 0
1✔
298
    cached_hits = 0
1✔
299

300
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
301
        nonlocal downloads, download_bytes, cached_hits
302
        directory = isinstance(v, Value.Directory)
1✔
303
        uri = v.value
1✔
304
        if downloadable(cfg, uri, directory=directory):
1✔
305
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
306
            cached, filename = download(
1✔
307
                cfg,
308
                logger,
309
                cache,
310
                uri,
311
                directory=directory,
312
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
313
                logger_prefix=logger_prefix + [f"download{downloads}"],
314
            )
315
            if cached:
1✔
316
                cached_hits += 1
1✔
317
            else:
318
                sz = pathsize(filename)
1✔
319
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
320
                downloads += 1
1✔
321
                download_bytes += sz
1✔
322
            return filename
1✔
323
        return uri
1✔
324

325
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
326
    if downloads or cached_hits:
1✔
327
        logger.notice(
1✔
328
            _(
329
                "processed input URIs",
330
                downloaded=downloads,
331
                downloaded_bytes=download_bytes,
332
                cached=cached_hits,
333
            )
334
        )
335
    return ans
1✔
336

337

338
def _add_downloadable_defaults(
1✔
339
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
340
) -> Env.Bindings[Value.Base]:
341
    """
342
    Look for available File/Directory inputs that default to a string constant appearing to be a
343
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
344
    not already overridden in them).
345

346
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
347
    declarations until after processing downloads.
348
    """
349
    ans = inputs
1✔
350
    for b in available_inputs:
1✔
351
        if (
1✔
352
            isinstance(b.value.type, (Type.File, Type.Directory))
353
            and b.name not in ans
354
            and isinstance(b.value.expr, Expr.String)
355
        ):
356
            directory = isinstance(b.value.type, Type.Directory)
1✔
357
            maybe_uri = b.value.expr.literal
1✔
358
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
359
                v = (
1✔
360
                    Value.Directory(maybe_uri.value, b.value.expr)
361
                    if directory
362
                    else Value.File(maybe_uri.value, b.value.expr)
363
                )
364
                ans = ans.bind(b.name, v)
1✔
365
    return ans
1✔
366

367

368
def _eval_task_inputs(
1✔
369
    logger: logging.Logger,
370
    task: Tree.Task,
371
    posix_inputs: Env.Bindings[Value.Base],
372
    container: "TaskContainer",
373
) -> Env.Bindings[Value.Base]:
374
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
375
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
376
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
377
    # supplied None to override any default.
378
    input_decls = task.available_inputs
1✔
379
    posix_inputs = posix_inputs.filter(
1✔
380
        lambda b: not (
381
            isinstance(b.value, Value.Null)
382
            and b.name in input_decls
383
            and input_decls[b.name].expr
384
            and not input_decls[b.name].type.optional
385
        )
386
    )
387

388
    # Map all the provided input File & Directory paths to in-container paths
389
    container.add_paths(_fspaths(posix_inputs))
1✔
390
    _warn_input_basename_collisions(logger, container)
1✔
391

392
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
393
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
394
        p = fn.value.rstrip("/")
1✔
395
        if isinstance(fn, Value.Directory):
1✔
396
            p += "/"
1✔
397
        return container.input_path_map[p]
1✔
398

399
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
400

401
    # initialize value environment with the inputs
402
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
403
    for b in container_inputs:
1✔
404
        assert isinstance(b, Env.Binding)
1✔
405
        v = b.value
1✔
406
        assert isinstance(v, Value.Base)
1✔
407
        container_env = container_env.bind(b.name, v)
1✔
408
        vj = json.dumps(v.json)
1✔
409
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
410

411
    # collect remaining declarations requiring evaluation.
412
    decls_to_eval = []
1✔
413
    for decl in (task.inputs or []) + (task.postinputs or []):
1✔
414
        if not container_env.has_binding(decl.name):
1✔
415
            decls_to_eval.append(decl)
1✔
416

417
    # topsort them according to internal dependencies. prior static validation
418
    # should have ensured they're acyclic.
419
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(decls_to_eval)
1✔
420
    decls_to_eval = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
421
    assert len(decls_by_id) == len(decls_to_eval)
1✔
422

423
    # evaluate each declaration in that order
424
    # note: the write_* functions call container.add_paths as a side-effect
425
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
426
    for decl in decls_to_eval:
1✔
427
        assert isinstance(decl, Tree.Decl)
1✔
428
        v = Value.Null()
1✔
429
        if decl.expr:
1✔
430
            try:
1✔
431
                v = decl.expr.eval(container_env, stdlib=stdlib).coerce(decl.type)
1✔
432
            except Error.RuntimeError as exn:
1✔
433
                setattr(exn, "job_id", decl.workflow_node_id)
1✔
434
                raise exn
1✔
435
            except Exception as exn:
×
436
                exn2 = Error.EvalError(decl, str(exn))
×
437
                setattr(exn2, "job_id", decl.workflow_node_id)
×
438
                raise exn2 from exn
×
439
        else:
440
            assert decl.type.optional
1✔
441
        vj = json.dumps(v.json)
1✔
442
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
443
        container_env = container_env.bind(decl.name, v)
1✔
444

445
    return container_env
1✔
446

447

448
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
449
    """
450
    Get the unique paths of all File & Directory values in the environment. Directory paths will
451
    have a trailing '/'.
452
    """
453
    ans = set()
1✔
454

455
    def collector(v: Value.Base) -> None:
1✔
456
        if isinstance(v, Value.File):
1✔
457
            assert not v.value.endswith("/")
1✔
458
            ans.add(v.value)
1✔
459
        elif isinstance(v, Value.Directory):
1✔
460
            ans.add(v.value.rstrip("/") + "/")
1✔
461
        for ch in v.children:
1✔
462
            collector(ch)
1✔
463

464
    for b in env:
1✔
465
        collector(b.value)
1✔
466
    return ans
1✔
467

468

469
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
470
    basenames = Counter(
1✔
471
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
472
    )
473
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
474
    if collisions:
1✔
475
        logger.warning(
1✔
476
            _(
477
                "mounting input files with colliding basenames in separate container directories",
478
                basenames=collisions,
479
            )
480
        )
481

482

483
def _eval_task_runtime(
1✔
484
    cfg: config.Loader,
485
    logger: logging.Logger,
486
    run_id: str,
487
    task: Tree.Task,
488
    inputs: Env.Bindings[Value.Base],
489
    container: "TaskContainer",
490
    env: Env.Bindings[Value.Base],
491
    stdlib: StdLib.Base,
492
) -> None:
493
    # evaluate runtime{} expressions (merged with any configured defaults)
494
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
495
    if run_id.startswith("download-"):
1✔
496
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
497
    runtime_values = {}
1✔
498
    for key, v in runtime_defaults.items():
1✔
499
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
500
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
501
        runtime_values[key] = expr.eval(env, stdlib)
1✔
502
    for b in inputs.enter_namespace("runtime"):
1✔
503
        runtime_values[b.name] = b.value  # input overrides
1✔
504
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
505

506
    # have container implementation validate & postprocess into container.runtime_values
507
    container.process_runtime(logger, runtime_values)
1✔
508

509
    if container.runtime_values:
1✔
510
        logger.info(_("effective runtime", **container.runtime_values))
1✔
511

512
    # add any configured overrides for in-container environment variables
513
    container.runtime_values.setdefault("env", {})
1✔
514
    env_vars_override = {}
1✔
515
    env_vars_skipped = []
1✔
516
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
517
        if ev_value is None:
1✔
518
            try:
1✔
519
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
520
            except KeyError:
1✔
521
                env_vars_skipped.append(ev_name)
1✔
522
        else:
523
            env_vars_override[ev_name] = str(ev_value)
1✔
524
    if env_vars_skipped:
1✔
525
        logger.warning(
1✔
526
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
527
        )
528
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
529
        "file_io", "mount_tmpdir_for"
530
    ):
531
        env_vars_override["TMPDIR"] = os.path.join(
1✔
532
            container.container_dir, "work", "_miniwdl_tmpdir"
533
        )
534
    if env_vars_override:
1✔
535
        # usually don't dump values into log, as they may often be auth tokens
536
        logger.notice(
1✔
537
            _(
538
                "overriding environment variables (portability warning)",
539
                names=list(env_vars_override.keys()),
540
            )
541
        )
542
        logger.debug(
1✔
543
            _("overriding environment variables (portability warning)", **env_vars_override)
544
        )
545
        container.runtime_values["env"].update(env_vars_override)
1✔
546

547
    # process decls with "env" decorator (EXPERIMENTAL)
548
    env_decls: Dict[str, Value.Base] = {}
1✔
549
    for decl in (task.inputs or []) + task.postinputs:
1✔
550
        if decl.decor.get("env", False) is True:
1✔
551
            if not env_decls:
1✔
552
                logger.warning(
1✔
553
                    "task env declarations are an experimental feature, subject to change"
554
                )
555
            v = env[decl.name]
1✔
556
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
557
                v = v.value
1✔
558
            else:
559
                v = json.dumps(v.json)
1✔
560
            env_decls[decl.name] = v
1✔
561
    container.runtime_values["env"].update(env_decls)
1✔
562

563
    unused_keys = list(
1✔
564
        key
565
        for key in runtime_values
566
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
567
    )
568
    if unused_keys:
1✔
569
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
570

571

572
def _try_task(
1✔
573
    cfg: config.Loader,
574
    task: Tree.Task,
575
    logger: logging.Logger,
576
    container: "TaskContainer",
577
    command: str,
578
    terminating: Callable[[], bool],
579
) -> None:
580
    """
581
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
582
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
583
    """
584
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
585

586
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
587
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
588
    retries = 0
1✔
589
    interruptions = 0
1✔
590

591
    while True:
×
592
        if terminating():
1✔
593
            raise Terminated()
1✔
594
        # copy input files, if needed
595
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
596
            "file_io", "copy_input_files_for"
597
        ):
598
            container.copy_input_files(logger)
1✔
599
        host_tmpdir = (
1✔
600
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
601
            if cfg.get_bool("file_io", "mount_tmpdir")
602
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
603
            else None
604
        )
605

606
        try:
1✔
607
            # start container & run command
608
            if host_tmpdir:
1✔
609
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
610
                os.mkdir(host_tmpdir, mode=0o770)
1✔
611
            try:
1✔
612
                return container.run(logger, command)
1✔
613
            finally:
614
                if host_tmpdir:
1✔
615
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
616
                    rmtree_atomic(host_tmpdir)
1✔
617
                if (
1✔
618
                    "preemptible" in container.runtime_values
619
                    and cfg.has_option("task_runtime", "_mock_interruptions")
620
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
621
                ):
622
                    raise Interrupted("mock interruption") from None
1✔
623
        except Exception as exn:
1✔
624
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
625
                logger.error(
1✔
626
                    _(
627
                        "interrupted task will be retried",
628
                        error=exn.__class__.__name__,
629
                        message=str(exn),
630
                        prev_interruptions=interruptions,
631
                        max_interruptions=max_interruptions,
632
                    )
633
                )
634
                interruptions += 1
1✔
635
            elif (
1✔
636
                not isinstance(exn, (Terminated, DockerBuildError))
637
                and retries < max_retries
638
                and not terminating()
639
            ):
640
                logger.error(
1✔
641
                    _(
642
                        "failed task will be retried",
643
                        error=exn.__class__.__name__,
644
                        message=str(exn),
645
                        prev_retries=retries,
646
                        max_retries=max_retries,
647
                    )
648
                )
649
                retries += 1
1✔
650
            else:
651
                raise
1✔
652
            _delete_work(cfg, logger, container, False)
1✔
653
            container.reset(logger)
1✔
654

655

656
def _eval_task_outputs(
1✔
657
    logger: logging.Logger,
658
    run_id: str,
659
    task: Tree.Task,
660
    env: Env.Bindings[Value.Base],
661
    container: "TaskContainer",
662
) -> Env.Bindings[Value.Base]:
663
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
664
    with suppress(FileNotFoundError):
1✔
665
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
666
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
667
            # courtesy log message directing user where to find it
668
            stdout_used = False
1✔
669
            expr_stack = [outp.expr for outp in task.outputs]
1✔
670
            while expr_stack:
1✔
671
                expr = expr_stack.pop()
1✔
672
                assert isinstance(expr, Expr.Base)
1✔
673
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
674
                    stdout_used = True
1✔
675
                else:
676
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
677
            if not stdout_used:
1✔
678
                logger.info(
1✔
679
                    _(
680
                        "command stdout unused; consider output `File cmd_out = stdout()`"
681
                        " or redirect command to stderr log >&2",
682
                        stdout_file=stdout_file,
683
                    )
684
                )
685

686
    # Helpers to rewrite File/Directory from in-container paths to host paths
687
    # First pass -- convert nonexistent output paths to None/Null
688
    def rewriter1(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
689
        container_path = v.value
1✔
690
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
691
            container_path += "/"
1✔
692
        if container.host_path(container_path) is None:
1✔
693
            logger.warning(
1✔
694
                _(
695
                    "output path not found in container (error unless declared type is optional)",
696
                    output=output_name,
697
                    path=container_path,
698
                )
699
            )
700
            return None
1✔
701
        return v.value
1✔
702

703
    # Second pass -- convert in-container paths to host paths
704
    def rewriter2(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
705
        container_path = v.value
1✔
706
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
707
            container_path += "/"
1✔
708
        host_path = container.host_path(container_path)
1✔
709
        assert host_path is not None
1✔
710
        if isinstance(v, Value.Directory):
1✔
711
            if host_path.endswith("/"):
1✔
712
                host_path = host_path[:-1]
1✔
713
            _check_directory(host_path, output_name)
1✔
714
            logger.debug(_("output dir", container=container_path, host=host_path))
1✔
715
        else:
716
            logger.debug(_("output file", container=container_path, host=host_path))
1✔
717
        return host_path
1✔
718

719
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
720
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
721
    for decl in task.outputs:
1✔
722
        assert decl.expr
1✔
723
        try:
1✔
724
            v = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type)
1✔
725
        except Error.RuntimeError as exn:
1✔
726
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
727
            raise exn
1✔
728
        except Exception as exn:
×
729
            exn2 = Error.EvalError(decl, str(exn))
×
730
            setattr(exn2, "job_id", decl.workflow_node_id)
×
731
            raise exn2 from exn
×
732
        _warn_struct_extra(logger, decl.name, v)
1✔
733
        vj = json.dumps(v.json)
1✔
734
        logger.info(
1✔
735
            _("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
736
        )
737

738
        # Now, a delicate sequence for postprocessing File outputs (including Files nested within
739
        # compound values)
740

741
        # First convert nonexistent paths to None/Null, and bind this in the environment for
742
        # evaluating subsequent output expressions.
743
        v = Value.rewrite_paths(v, lambda w: rewriter1(w, decl.name))
1✔
744
        env = env.bind(decl.name, v)
1✔
745
        # check if any nonexistent paths were provided for (non-optional) File/Directory types
746
        # Value.Null.coerce has a special behavior for us to raise FileNotFoundError for a
747
        # non-optional File/Directory type.
748
        try:
1✔
749
            v = v.coerce(decl.type)
1✔
750
        except FileNotFoundError:
1✔
751
            err = OutputError("File/Directory path not found in task output " + decl.name)
1✔
752
            setattr(err, "job_id", decl.workflow_node_id)
1✔
753
            raise err
1✔
754
        # Rewrite in-container paths to host paths
755
        v = Value.rewrite_paths(v, lambda w: rewriter2(w, decl.name))
1✔
756
        outputs = outputs.bind(decl.name, v)
1✔
757

758
    return outputs
1✔
759

760

761
def _check_directory(host_path: str, output_name: str) -> None:
1✔
762
    """
763
    traverse output directory to check that all symlinks are relative & resolve inside the dir
764
    """
765

766
    def raiser(exc: OSError):
1✔
767
        raise exc
×
768

769
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
770
        for fn in files:
1✔
771
            fn = os.path.join(root, fn)
1✔
772
            if os.path.islink(fn) and (
1✔
773
                not os.path.exists(fn)
774
                or os.path.isabs(os.readlink(fn))
775
                or not path_really_within(fn, host_path)
776
            ):
777
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
778

779

780
def _warn_struct_extra(
1✔
781
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
782
) -> None:
783
    """
784
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
785
    """
786
    if warned_keys is None:
1✔
787
        warned_keys = set()
1✔
788
    if isinstance(v, Value.Struct) and v.extra:
1✔
789
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
790
        if extra_keys - warned_keys:
1✔
791
            logger.notice(
1✔
792
                _(
793
                    "extraneous keys in struct initializer",
794
                    decl=decl_name,
795
                    struct=str(v.type),
796
                    extra_keys=list(extra_keys),
797
                )
798
            )
799
            warned_keys.update(extra_keys)
1✔
800
    for ch in v.children:
1✔
801
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
802

803

804
def link_outputs(
1✔
805
    cache: CallCache,
806
    outputs: Env.Bindings[Value.Base],
807
    run_dir: str,
808
    hardlinks: bool = False,
809
    use_relative_output_paths: bool = False,
810
) -> Env.Bindings[Value.Base]:
811
    """
812
    Following a successful run, the output files may be scattered throughout a complex directory
813
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
814
    containing nicely organized symlinks to the output files, and rewrite File values in the
815
    outputs env to use these symlinks.
816
    """
817

818
    def link1(target: str, link: str, directory: bool) -> None:
1✔
819
        if hardlinks:
1✔
820
            # TODO: what if target is an input from a different filesystem?
821
            if directory:
1✔
822
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
823
            else:
824
                link_force(target, link)
1✔
825
        else:
826
            symlink_force(target, link)
1✔
827

828
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
829
        if isinstance(v, (Value.File, Value.Directory)):
1✔
830
            target = (
1✔
831
                v.value
832
                if os.path.exists(v.value)
833
                else cache.get_download(v.value, isinstance(v, Value.Directory))
834
            )
835
            if target:
1✔
836
                target = os.path.realpath(target)
1✔
837
                assert os.path.exists(target)
1✔
838
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
839
                    # make symlink relative
840
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
841
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
842
                os.makedirs(dn, exist_ok=False)
1✔
843
                link1(target, link, isinstance(v, Value.Directory))
1✔
844
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
845
                # directory without reference to the output types or JSON for whatever reason. It
846
                # might otherwise have trouble distinguishing Directory outputs among the
847
                # structured subdirectories we create for compound types.
848
                if isinstance(v, Value.Directory):
1✔
849
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
850
                        pass
1✔
851
                v.value = link
1✔
852
        # recurse into compound values
853
        elif isinstance(v, Value.Array) and v.value:
1✔
854
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
855
            for i in range(len(v.value)):
1✔
856
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
857
        elif isinstance(v, Value.Map) and v.value:
1✔
858
            # create a subdirectory for each key, as long as the key names seem to make reasonable
859
            # path components; otherwise, treat the dict as a list of its values
860
            keys_ok = (
1✔
861
                sum(
862
                    1
863
                    for b in v.value
864
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
865
                    is None
866
                )
867
                == 0
868
            )
869
            d = int(math.ceil(math.log10(len(v.value))))
1✔
870
            for i, b in enumerate(v.value):
1✔
871
                v.value[i] = (
1✔
872
                    b[0],
873
                    map_paths(
874
                        b[1],
875
                        os.path.join(
876
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
877
                        ),
878
                    ),
879
                )
880
        elif isinstance(v, Value.Pair):
1✔
881
            v.value = (
1✔
882
                map_paths(v.value[0], os.path.join(dn, "left")),
883
                map_paths(v.value[1], os.path.join(dn, "right")),
884
            )
885
        elif isinstance(v, Value.Struct):
1✔
886
            for key in v.value:
1✔
887
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
888
        return v
1✔
889

890
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
891

892
    if use_relative_output_paths:
1✔
893
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
894

895
    return outputs.map(
1✔
896
        lambda binding: Env.Binding(
897
            binding.name,
898
            map_paths(
899
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
900
                os.path.join(run_dir, "out", binding.name),
901
            ),
902
        )
903
    )
904

905

906
def link_outputs_relative(
1✔
907
    link1: Callable[[str, str, bool], None],
908
    cache: CallCache,
909
    outputs: Env.Bindings[Value.Base],
910
    run_dir: str,
911
    hardlinks: bool = False,
912
) -> Env.Bindings[Value.Base]:
913
    """
914
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
915
    the generated files' paths relative to their task working directory.
916
    """
917
    link_destinations: Dict[str, str] = dict()
1✔
918

919
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
920
        target = (
1✔
921
            v.value
922
            if os.path.exists(v.value)
923
            else cache.get_download(v.value, isinstance(v, Value.Directory))
924
        )
925
        if target:
1✔
926
            real_target = os.path.realpath(target)
1✔
927
            rel_link = None
1✔
928
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
929
                # target was generated by current task; use its path relative to the task work dir
930
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
931
                    rel_link = os.path.relpath(
1✔
932
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
933
                    )
934
            else:
935
                # target is an out/ link generated by a call in the current workflow OR a cached
936
                # run; use the link's path relative to that out/ dir, which by induction should
937
                # equal its path relative to the original work/ dir.
938
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
939
                # user's cwd or the task-generated relative path might coincidentally have
940
                # something named 'out'.
941
                p = None
1✔
942
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
943
                    if p and (
1✔
944
                        os.path.isfile(os.path.join(target[:p], "task.log"))
945
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
946
                    ):
947
                        break
1✔
948
                    p = None
1✔
949
                if p and p + 5 < len(target):
1✔
950
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
951
            # if neither of the above cases applies, then fall back to just the target basename
952
            rel_link = rel_link or os.path.basename(target)
1✔
953
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
954
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
955
                raise FileExistsError(
1✔
956
                    "Output filename collision; to allow this, set"
957
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
958
                )
959
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
960
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
961
            link_destinations[abs_link] = real_target
1✔
962
            return abs_link
1✔
963
        return v.value
×
964

965
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
966

967

968
def _warn_output_basename_collisions(
1✔
969
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
970
) -> None:
971
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
972

973
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
974
        target = v.value
1✔
975
        if os.path.exists(target):
1✔
976
            target = os.path.realpath(target)
1✔
977
        basename = os.path.basename(target)
1✔
978
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
979
        return v.value
1✔
980

981
    Value.rewrite_env_paths(outputs, walker)
1✔
982

983
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
984
    if collisions:
1✔
985
        logger.warning(
1✔
986
            _(
987
                "multiple output files share the same basename; while miniwdl supports this,"
988
                " consider modifying WDL to ensure distinct output basenames",
989
                basenames=collisions,
990
            )
991
        )
992

993

994
def _delete_work(
1✔
995
    cfg: config.Loader,
996
    logger: logging.Logger,
997
    container: "Optional[TaskContainer]",
998
    success: bool,
999
) -> None:
1000
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1001
    if container and (
1✔
1002
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1003
    ):
1004
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1005
            logger.warning(
1✔
1006
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1007
            )
1008
            return
1✔
1009
        container.delete_work(logger, delete_streams=not success)
1✔
1010

1011

1012
class _StdLib(StdLib.Base):
1✔
1013
    logger: logging.Logger
1✔
1014
    container: "TaskContainer"
1✔
1015
    inputs_only: bool  # if True then only permit access to input files
1✔
1016

1017
    def __init__(
1✔
1018
        self,
1019
        wdl_version: str,
1020
        logger: logging.Logger,
1021
        container: "TaskContainer",
1022
        inputs_only: bool,
1023
    ) -> None:
1024
        super().__init__(wdl_version, write_dir=os.path.join(container.host_dir, "write_"))
1✔
1025
        self.logger = logger
1✔
1026
        self.container = container
1✔
1027
        self.inputs_only = inputs_only
1✔
1028

1029
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1030
        # check allowability of reading this file, & map from in-container to host
1031
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1032
        if ans is None:
1✔
1033
            raise OutputError("function was passed non-existent file " + filename)
×
1034
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1035
        return ans
1✔
1036

1037
    def _virtualize_filename(self, filename: str) -> str:
1✔
1038
        # register new file with container input_path_map
1039
        self.container.add_paths([filename])
1✔
1040
        self.logger.debug(
1✔
1041
            _("write_", host=filename, container=self.container.input_path_map[filename])
1042
        )
1043
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1044
        return self.container.input_path_map[filename]
1✔
1045

1046

1047
class InputStdLib(_StdLib):
1✔
1048
    # StdLib for evaluation of task inputs and command
1049
    def __init__(
1✔
1050
        self,
1051
        wdl_version: str,
1052
        logger: logging.Logger,
1053
        container: "TaskContainer",
1054
    ) -> None:
1055
        super().__init__(wdl_version, logger, container, True)
1✔
1056

1057

1058
class OutputStdLib(_StdLib):
1✔
1059
    # StdLib for evaluation of task outputs
1060
    def __init__(
1✔
1061
        self,
1062
        wdl_version: str,
1063
        logger: logging.Logger,
1064
        container: "TaskContainer",
1065
    ) -> None:
1066
        super().__init__(wdl_version, logger, container, False)
1✔
1067

1068
        setattr(
1✔
1069
            self,
1070
            "stdout",
1071
            StdLib.StaticFunction(
1072
                "stdout",
1073
                [],
1074
                Type.File(),
1075
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1076
            ),
1077
        )
1078
        setattr(
1✔
1079
            self,
1080
            "stderr",
1081
            StdLib.StaticFunction(
1082
                "stderr",
1083
                [],
1084
                Type.File(),
1085
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1086
            ),
1087
        )
1088

1089
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1090
            pat = pattern.coerce(Type.String()).value
1✔
1091
            if not pat:
1✔
1092
                raise OutputError("empty glob() pattern")
×
1093
            assert isinstance(pat, str)
1✔
1094
            if pat[0] == "/":
1✔
1095
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1096
            if pat.startswith("..") or "/.." in pat:
1✔
1097
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1098
            if pat.startswith("./"):
1✔
1099
                pat = pat[2:]
1✔
1100
            # glob the host directory
1101
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1102
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1103
            # convert the host filenames to in-container filenames
1104
            container_files = []
1✔
1105
            for hf in host_files:
1✔
1106
                dstrip = lib.container.host_work_dir()
1✔
1107
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1108
                assert hf.startswith(dstrip)
1✔
1109
                container_files.append(
1✔
1110
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1111
                )
1112
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1113

1114
        setattr(
1✔
1115
            self,
1116
            "glob",
1117
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1118
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc